Understanding Celery Signals: A Deep Dive

May 14, 2024

What is the Need for Celery Signals?

ELI5 (Explain Like I'm Five)

Celery signals are like helpful alerts that tell you what's happening with your tasks, just like when you're baking cookies and someone tells you when the dough is ready, when the cookies are baking, and when they're done. These alerts help you keep track of everything and fix any problems quickly.

In-Depth Explanation

Celery signals provide hooks to execute custom actions at various points in the task lifecycle, such as before a task starts, after it finishes, or if it fails. This capability is essential for several reasons:

  • Enhanced Logging: Signals can automatically log task events, providing detailed insights into task execution, success, or failure. This information is crucial for debugging and monitoring the health of your system.
  • Custom Monitoring: By connecting to signals, you can implement custom monitoring solutions that track task performance and notify you of any issues, ensuring your distributed system remains robust and reliable.
  • Error Handling: Signals allow for sophisticated error handling by executing specific actions when a task fails, such as sending alerts, retrying tasks, or triggering fallback mechanisms.
  • Flexibility and Control: They offer developers fine-grained control over task execution, enabling them to customize behavior dynamically based on the task state, improving the overall efficiency and reliability of the system.


Celery Task Execution Lifecycle with Signals

An illustration of a Celery task execution lifecycle showing various signals.
Illustration by Irene M. Ray from Ouch!

Celery is a powerful, distributed task queue that allows you to handle asynchronous tasks and manage them efficiently. One of the most useful features of Celery is its support for signals, which are hooks that enable you to execute custom actions at various points in the task lifecycle. In this blog post, we'll explore what Celery signals are, how to use them effectively, and provide detailed examples.

What are Celery Signals?

ELI 5 (Explain Like I'm 5):

Celery signals are like special notifications that let you know what's happening with your tasks. Imagine baking cookies: a signal would tell you when the dough is mixed, when the cookies are in the oven, and when they're done. This helps you keep track of everything and handle any surprises, making sure your tasks run smoothly.

In Depth:

Celery signals are hooks that allow you to connect certain actions to specific events within the Celery task lifecycle. They provide a way to execute custom code at various points, such as before a task starts, after a task finishes, or when a task fails. This can be useful for logging, monitoring, or modifying task behavior dynamically.

Common Celery Signals

Here are some of the most commonly used Celery signals:

  • task_prerun: Sent before a task is executed.
  • task_postrun: Sent after a task has been executed.
  • task_success: Sent when a task completes successfully.
  • task_failure: Sent when a task fails.
  • task_retry: Sent when a task is retried.
  • task_revoked: Sent when a task is revoked.
  • worker_ready: Sent when a worker starts up and is ready.
  • worker_shutdown: Sent when a worker is shutting down.

Using Celery Signals

To use Celery signals, you need to connect a receiver function to the signal you are interested in. This is typically done using decorators. Let's look at how to set up and use these signals.

Basic Example

import celery
from celery.signals import task_prerun, task_postrun, task_success, task_failure

# Define a receiver function for the task_prerun signal
@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs):
    print(f"Task {task_id} is about to start")

# Define a receiver function for the task_postrun signal
@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, **kwargs):
    print(f"Task {task_id} has finished")

# Define a receiver function for the task_success signal
@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):
    print(f"Task {sender.name} completed successfully with result: {result}")

# Define a receiver function for the task_failure signal
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, **kwargs):
    print(f"Task {task_id} failed with exception: {exception}")

In this example, we define receiver functions for the following signals:

`task_prerun`, `task_postrun`, `task_success`, & `task_failure` 

Each receiver function is connected to its corresponding signal using the @signal.connect decorator. The functions will be called automatically when the respective events occur.

Practical Examples

Logging Task Events

Logging is a common use case for Celery signals. Let's create an example where we log task events to a file:

import logging
from celery.signals import task_prerun, task_postrun, task_success, task_failure

# Set up logging
logger = logging.getLogger(__name__)
handler = logging.FileHandler('task_events.log')
formatter = logging.Formatter('%(asctime)s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# Define a receiver function for the task_prerun signal
@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs):
    logger.info(f"Task {task_id} is about to start")

# Define a receiver function for the task_postrun signal
@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, **kwargs):
    logger.info(f"Task {task_id} has finished")

# Define a receiver function for the task_success signal
@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):
    logger.info(f"Task {sender.name} completed successfully with result: {result}")

# Define a receiver function for the task_failure signal
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, **kwargs):
    logger.info(f"Task {task_id} failed with exception: {exception}")

In this example, we set up a logger to write task events to a file named task_events.log. Each signal handler logs a message indicating the state of the task.

Monitoring Task Execution Time

Monitoring the execution time of tasks is another practical application of Celery signals. Let's create an example to achieve this:

import time
from celery.signals import task_prerun, task_postrun

# Dictionary to store start times of tasks
task_start_times = {}

# Define a receiver function for the task_prerun signal
@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs):
    task_start_times[task_id] = time.time()

# Define a receiver function for the task_postrun signal
@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, **kwargs):
    start_time = task_start_times.pop(task_id, None)
    if start_time:
        execution_time = time.time() - start_time
        print(f"Task {task_id} executed in {execution_time:.2f} seconds")

In this example, we use the task_prerun and task_postrun signals to record the start and end times of tasks. The execution time is calculated and printed when the task completes.

Handling Task Failures Gracefully

It's important to handle task failures gracefully, especially in production environments. Let's create an example to handle task failures:

import logging
from celery.signals import task_failure

# Set up logging
logger = logging.getLogger(__name__)
handler = logging.FileHandler('task_failures.log')
formatter = logging.Formatter('%(asctime)s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.ERROR)

# Define a receiver function for the task_failure signal
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, **kwargs):
    logger.error(f"Task {task_id} failed with exception: {exception}")
    # Additional logic to handle the failure
    # For example, sending an email notification or retrying the task

In this example, we set up a logger to write task failure events to a file named task_failures.log. Additionally, you can add custom logic to handle the failure, such as sending an email notification or retrying the task.

Sample Task with Signals

Let's create a sample Celery task and demonstrate how to connect signals to it. We will also show the output of the task execution.

Sample Task

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(bind=True)
def add(self, x, y):
    return x + y

In this example, we define a simple task named add that takes two arguments and returns their sum.

Connecting Signals

Now, let's connect signals to this task:

import logging
from celery.signals import task_prerun, task_postrun, task_success, task_failure

# Set up logging
logger = logging.getLogger(__name__)
handler = logging.FileHandler('task_signals.log')
formatter = logging.Formatter('%(asctime)s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# Define a receiver function for the task_prerun signal
@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs):
    logger.info(f"Task {task_id} is about to start")

# Define a receiver function for the task_postrun signal
@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, **kwargs):
    logger.info(f"Task {task_id} has finished")

# Define a receiver function for the task_success signal
@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):
    logger.info(f"Task {sender.name} completed successfully with result: {result}")

# Define a receiver function for the task_failure signal
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, **kwargs):
    logger.error(f"Task {task_id} failed with exception: {exception}")

In this example, we set up a logger to write task events to a file named task_signals.log. Each signal handler logs a message indicating the state of the task.

Executing the Sample Task

To execute the sample task and see the output, run the following code:

from tasks import add

result = add.apply_async((4, 6))
print(result.get())

Output

Here is the expected output in the task_signals.log file:

2024-05-15 12:00:00 Task c72a1b2a-77b2-4b6a-8b7a-1a2b3c4d5e6f is about to start
202