Calling Python Celery Tasks from a Different Machine Using send_task

Prerequisites

To follow along, you will need:

  • Python installed on both the client and worker machines.
  • Celery and a message broker (RabbitMQ) installed. Redis will be used as the result backend.
  • Basic knowledge of Python and familiarity with Celery.

Step 1: Setup the Worker

First, let’s set up the Celery worker. On the worker machine, create a file named tasks.py:

from celery import Celery

app = Celery("tasks", broker='amqp://username:password@localhost',
             backend='redis://localhost:6379/0')

@app.task(name='celery_project.tasks.add')
def add(x, y):
    return x + y

Here, we define a simple task named add that takes two arguments and returns their sum. Adjust the broker and backend URLs to point to your actual RabbitMQ and Redis services.

Step 2: Start the Celery Worker

Run the following command on the worker machine to start the Celery worker:

.venv\Scripts\python.exe -m celery -A tasks worker --loglevel=info -E --pool=solo

This command starts a Celery worker that listens for tasks to execute.

Step 3: Setup the Client

On the client machine, you don’t need the full task definitions—only the Celery app configuration and the task signatures. Create a file named main.py:

from celery import Celery

app = Celery("tasks", broker='amqp://username:password@localhost',
             backend='redis://localhost:6379/0')

result = app.send_task('celery_project.tasks.add', args=[4, 4])
print(result.get())

Here, send_task is used to dispatch the task. It requires the name of the task (which must match the name given in the worker’s task decorator) and the arguments for the task.

Step 4: Calling the Task from the Client

Run the main.py script on the client machine:

python main.py

This script sends the add task to the worker machine via the message broker, and then fetches the result using result.get().

Or Use Minimal Task Definitions approach

On the client side, you only need a minimal definition of the tasks to send them. You can redefine the tasks in a simple module that just includes the task names, without their implementations:

client_tasks.py:

from celery import Celery

app = Celery('client_tasks', broker='pyamqp://guest@your_broker_ip//')

@app.task(name='your_module_name.tasks.add')
def add(x, y):
    pass  # Implementation is not needed on the client

Then on the client:

from client_tasks import add
result = add.delay(4, 4)
print(result.get(timeout=10))

Using Celery in Python with tasks defined in different modules

Setup

Requirements

To get started, you will need Python installed on your system. Additionally, you will need RabbitMQ and Redis. You can install RabbitMQ and Redis on your local machine or use Docker containers.

Python Dependencies

Install Celery using pip:

pip install celery

Project Structure

Here’s a simple project structure to organize your Celery tasks:

celery_project/
│
├── celery_app.py    # Celery configuration and instance
├── task1.py         # Module for 'add' task
├── task2.py         # Module for 'multiply' task
└── main.py          # Main script to execute tasks

Celery Configuration

In celery_app.py, we configure our Celery application:

from celery import Celery

app = Celery("tasks", broker='amqp://username:password@localhost',
             backend='redis://localhost:6379/0',
             include=['task1', 'task2'])

if __name__ == '__main__':
    app.start()
  • broker: The URL of the RabbitMQ server.
  • backend: The URL of the Redis server used to store task results.
  • include: List of modules to include so Celery knows where to find the defined tasks.

Defining Tasks

Tasks are defined in task1.py and task2.py:

task1.py:

from celery_app import app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info(f'Starting to add {x} + {y}')
    result = x + y
    logger.info(f'Task completed with result {result}')
    return result

task2.py:

from celery_app import app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def multiply(x, y):
    logger.info(f'Starting to multiply {x} * {y}')
    result = x * y
    logger.info(f'Task completed with result {result}')
    return result

Running Tasks

In main.py, we initiate and execute tasks asynchronously:

from task1 import add
from task2 import multiply

result1 = add.delay(1, 2)
result2 = multiply.delay(2, 3)

print("add: " + str(result1.get(timeout=10)))
print("multiply: " + str(result2.get(timeout=10)))

Running Celery Worker

To run the Celery worker, use the following command:

.venv\Scripts\python.exe -m celery -A celery_app worker --loglevel=info -E --pool=solo

Get Result from Asynchronous Celery Tasks in Python

Setting Up the Project

First, let’s set up our Celery instance in a file named task.py. This setup involves configuring Celery with RabbitMQ as the message broker and an RPC backend for storing task results:

from celery import Celery
from celery.utils.log import get_task_logger

# Initialize Celery application
app = Celery("tasks", broker='amqp://username:password@localhost', backend='rpc://')

# Create a logger
logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info(f'Starting to add {x} + {y}')
    try:
        result = x + y
        logger.info(f'Task completed with result {result}')
        return result
    except Exception as e:
        logger.error('Error occurred', exc_info=True)
        raise e

In the code above, we define a Celery application named tasks configured with a RabbitMQ broker. The logger is utilized to record the operations and any errors encountered during the execution of tasks.

Invoking Asynchronous Tasks

Next, let’s write a main.py to invoke our asynchronous task and handle the result:

from celery.result import AsyncResult
from tasks import add

# Sending an asynchronous task
result: AsyncResult = add.delay(1, 2)

# Checking if the task is ready and retrieving the result
print(result.ready())  # Prints False if the task is not yet ready
print(result.get(timeout=10))  # Waits for the result up to 10 seconds

Here, add.delay(1, 2) sends an asynchronous task to add the numbers 1 and 2. The AsyncResult object allows us to check if the task is completed and to fetch the result once it is available.

Running the Celery Worker

To execute the tasks, we need to run a Celery worker. Due to compatibility issues with Windows, we use the --pool=solo option:

.venv\Scripts\python.exe -m celery -A tasks worker --loglevel=info -E --pool=solo

The --pool=solo option is crucial for running Celery on Windows as it avoids issues that arise from the default prefork pool, which is not fully supported on Windows platforms.

Simplifying Asynchronous Task Execution with Celery in Python

Setting up the Celery Application

First, we need to set up our Celery application. This involves specifying the message broker and defining tasks. A message broker is a mechanism responsible for transferring data between the application and Celery workers. In our example, we use RabbitMQ as the broker.

Here is the code snippet for setting up a Celery application, saved in a file named tasks.py:

from celery import Celery

# Create a Celery instance
app = Celery("tasks", broker='amqp://username:password@localhost')

# Define a simple task to add two numbers
@app.task
def add(x, y):
    return x + y

In this setup, Celery is initialized with a name (“tasks”) and a broker URL, which includes the username, password, and server location (in this case, localhost for local development).

Defining a Task

We define a simple task using the @app.task decorator. This task, add, takes two parameters, x and y, and returns their sum. The decorator marks this function as a task that Celery can manage.

Calling the Task Asynchronously

To call our add task asynchronously, we use the following code snippet in main.py:

from tasks import add

# Call the add task asynchronously
result = add.delay(1, 2)
print("Task sent to the Celery worker!")

The delay method is a convenient shortcut provided by Celery to execute the task asynchronously. When add.delay(1, 2) is called, Celery sends this task to the queue and then it’s picked up by a worker.

Running Celery Workers

To execute the tasks in the queue, we need to run Celery workers. Assuming you’ve activated a virtual environment, you can start a Celery worker using the following command:

.venv\Scripts\celery.exe -A tasks worker --loglevel=info

This command starts a Celery worker with a log level of info, which provides a moderate amount of logging output. Here, -A tasks tells Celery that our application is defined in the tasks.py file.