Variables vs. Type Aliases in Python

In Python, variables can have type annotations to indicate the type of value they are expected to hold. This is particularly useful for static type checkers and for improving code readability. When defining a variable with a type annotation, you explicitly specify the type:

from typing import Type

class A:
    ...

tp: Type[A] = A

In this example:

  • tp is a variable with a type annotation.
  • Type[A] indicates that tp should hold a type object corresponding to class A.

Type Aliases

A type alias is a way to give a new name to an existing type. This can make your code more readable, especially when dealing with complex types. Type aliases are defined without an explicit type annotation at the top level of a module:

class A:
    ...

Alias = A

Here:

  • Alias is a type alias for class A.
  • This does not create a new type but simply provides an alternative name for A.

Using type aliases can simplify type annotations and make your code more descriptive.

Explicit Type Aliases with TypeAlias (PEP 613)

PEP 613 introduced the TypeAlias feature to explicitly define type aliases. This can be especially useful in larger projects or when defining type aliases in class bodies or functions. To use TypeAlias, import it from the typing module (or typing_extensions for Python 3.9 and earlier):

from typing import TypeAlias  # "from typing_extensions" in Python 3.9 and earlier

class A:
    ...

Alias: TypeAlias = A

Using TypeAlias makes it clear that Alias is intended to be a type alias, not a variable. This explicitness enhances code readability and maintainability.

Calling Python Celery Tasks from a Different Machine Using send_task

Prerequisites

To follow along, you will need:

  • Python installed on both the client and worker machines.
  • Celery and a message broker (RabbitMQ) installed. Redis will be used as the result backend.
  • Basic knowledge of Python and familiarity with Celery.

Step 1: Setup the Worker

First, let’s set up the Celery worker. On the worker machine, create a file named tasks.py:

from celery import Celery

app = Celery("tasks", broker='amqp://username:password@localhost',
             backend='redis://localhost:6379/0')

@app.task(name='celery_project.tasks.add')
def add(x, y):
    return x + y

Here, we define a simple task named add that takes two arguments and returns their sum. Adjust the broker and backend URLs to point to your actual RabbitMQ and Redis services.

Step 2: Start the Celery Worker

Run the following command on the worker machine to start the Celery worker:

.venv\Scripts\python.exe -m celery -A tasks worker --loglevel=info -E --pool=solo

This command starts a Celery worker that listens for tasks to execute.

Step 3: Setup the Client

On the client machine, you don’t need the full task definitions—only the Celery app configuration and the task signatures. Create a file named main.py:

from celery import Celery

app = Celery("tasks", broker='amqp://username:password@localhost',
             backend='redis://localhost:6379/0')

result = app.send_task('celery_project.tasks.add', args=[4, 4])
print(result.get())

Here, send_task is used to dispatch the task. It requires the name of the task (which must match the name given in the worker’s task decorator) and the arguments for the task.

Step 4: Calling the Task from the Client

Run the main.py script on the client machine:

python main.py

This script sends the add task to the worker machine via the message broker, and then fetches the result using result.get().

Or Use Minimal Task Definitions approach

On the client side, you only need a minimal definition of the tasks to send them. You can redefine the tasks in a simple module that just includes the task names, without their implementations:

client_tasks.py:

from celery import Celery

app = Celery('client_tasks', broker='pyamqp://guest@your_broker_ip//')

@app.task(name='your_module_name.tasks.add')
def add(x, y):
    pass  # Implementation is not needed on the client

Then on the client:

from client_tasks import add
result = add.delay(4, 4)
print(result.get(timeout=10))

Using Celery in Python with tasks defined in different modules

Setup

Requirements

To get started, you will need Python installed on your system. Additionally, you will need RabbitMQ and Redis. You can install RabbitMQ and Redis on your local machine or use Docker containers.

Python Dependencies

Install Celery using pip:

pip install celery

Project Structure

Here’s a simple project structure to organize your Celery tasks:

celery_project/
│
├── celery_app.py    # Celery configuration and instance
├── task1.py         # Module for 'add' task
├── task2.py         # Module for 'multiply' task
└── main.py          # Main script to execute tasks

Celery Configuration

In celery_app.py, we configure our Celery application:

from celery import Celery

app = Celery("tasks", broker='amqp://username:password@localhost',
             backend='redis://localhost:6379/0',
             include=['task1', 'task2'])

if __name__ == '__main__':
    app.start()
  • broker: The URL of the RabbitMQ server.
  • backend: The URL of the Redis server used to store task results.
  • include: List of modules to include so Celery knows where to find the defined tasks.

Defining Tasks

Tasks are defined in task1.py and task2.py:

task1.py:

from celery_app import app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info(f'Starting to add {x} + {y}')
    result = x + y
    logger.info(f'Task completed with result {result}')
    return result

task2.py:

from celery_app import app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def multiply(x, y):
    logger.info(f'Starting to multiply {x} * {y}')
    result = x * y
    logger.info(f'Task completed with result {result}')
    return result

Running Tasks

In main.py, we initiate and execute tasks asynchronously:

from task1 import add
from task2 import multiply

result1 = add.delay(1, 2)
result2 = multiply.delay(2, 3)

print("add: " + str(result1.get(timeout=10)))
print("multiply: " + str(result2.get(timeout=10)))

Running Celery Worker

To run the Celery worker, use the following command:

.venv\Scripts\python.exe -m celery -A celery_app worker --loglevel=info -E --pool=solo

Get Result from Asynchronous Celery Tasks in Python

Setting Up the Project

First, let’s set up our Celery instance in a file named task.py. This setup involves configuring Celery with RabbitMQ as the message broker and an RPC backend for storing task results:

from celery import Celery
from celery.utils.log import get_task_logger

# Initialize Celery application
app = Celery("tasks", broker='amqp://username:password@localhost', backend='rpc://')

# Create a logger
logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info(f'Starting to add {x} + {y}')
    try:
        result = x + y
        logger.info(f'Task completed with result {result}')
        return result
    except Exception as e:
        logger.error('Error occurred', exc_info=True)
        raise e

In the code above, we define a Celery application named tasks configured with a RabbitMQ broker. The logger is utilized to record the operations and any errors encountered during the execution of tasks.

Invoking Asynchronous Tasks

Next, let’s write a main.py to invoke our asynchronous task and handle the result:

from celery.result import AsyncResult
from tasks import add

# Sending an asynchronous task
result: AsyncResult = add.delay(1, 2)

# Checking if the task is ready and retrieving the result
print(result.ready())  # Prints False if the task is not yet ready
print(result.get(timeout=10))  # Waits for the result up to 10 seconds

Here, add.delay(1, 2) sends an asynchronous task to add the numbers 1 and 2. The AsyncResult object allows us to check if the task is completed and to fetch the result once it is available.

Running the Celery Worker

To execute the tasks, we need to run a Celery worker. Due to compatibility issues with Windows, we use the --pool=solo option:

.venv\Scripts\python.exe -m celery -A tasks worker --loglevel=info -E --pool=solo

The --pool=solo option is crucial for running Celery on Windows as it avoids issues that arise from the default prefork pool, which is not fully supported on Windows platforms.

Simplifying Asynchronous Task Execution with Celery in Python

Setting up the Celery Application

First, we need to set up our Celery application. This involves specifying the message broker and defining tasks. A message broker is a mechanism responsible for transferring data between the application and Celery workers. In our example, we use RabbitMQ as the broker.

Here is the code snippet for setting up a Celery application, saved in a file named tasks.py:

from celery import Celery

# Create a Celery instance
app = Celery("tasks", broker='amqp://username:password@localhost')

# Define a simple task to add two numbers
@app.task
def add(x, y):
    return x + y

In this setup, Celery is initialized with a name (“tasks”) and a broker URL, which includes the username, password, and server location (in this case, localhost for local development).

Defining a Task

We define a simple task using the @app.task decorator. This task, add, takes two parameters, x and y, and returns their sum. The decorator marks this function as a task that Celery can manage.

Calling the Task Asynchronously

To call our add task asynchronously, we use the following code snippet in main.py:

from tasks import add

# Call the add task asynchronously
result = add.delay(1, 2)
print("Task sent to the Celery worker!")

The delay method is a convenient shortcut provided by Celery to execute the task asynchronously. When add.delay(1, 2) is called, Celery sends this task to the queue and then it’s picked up by a worker.

Running Celery Workers

To execute the tasks in the queue, we need to run Celery workers. Assuming you’ve activated a virtual environment, you can start a Celery worker using the following command:

.venv\Scripts\celery.exe -A tasks worker --loglevel=info

This command starts a Celery worker with a log level of info, which provides a moderate amount of logging output. Here, -A tasks tells Celery that our application is defined in the tasks.py file.

Install chromedriver Automatically while using Selenium in Python

pip install webdriver-manager
service = Service(executable_path=ChromeDriverManager().install())
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument("--start-maximized")
chrome_options.add_experimental_option("useAutomationExtension", False)
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
# Add headless option
chrome_options.add_argument("--headless")
self.driver = webdriver.Chrome(options=chrome_options, service=service)

 

Deploy applications in Run as Administrator mode in Windows using PyInstaller and Inno Setup

PyInstaller

venv\Scripts\pyinstaller.exe --noconsole --onefile --icon=images/icons8-anki-512.ico --manifest=MyAnki.exe.manifest --uac-admin -n MyAnki app.py

MyAnki.exe.manifest

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<assembly xmlns="urn:schemas-microsoft-com:asm.v1" manifestVersion="1.0">
  <trustInfo xmlns="urn:schemas-microsoft-com:asm.v3">
    <security>
      <requestedPrivileges>
        <requestedExecutionLevel level="requireAdministrator" uiAccess="false" />
      </requestedPrivileges>
    </security>
  </trustInfo>
</assembly>

inno_setup.iss

[Setup]
PrivilegesRequired=admin

 

Modifying the column names of a Pandas DataFrame

Rename Specific Columns

import pandas as pd

# Create a DataFrame
df = pd.DataFrame({
    'Name': ['Alice', 'Bob'],
    'Age': [25, 30]
})

# Rename the 'Name' column to 'Full Name' and 'Age' to 'Age in Years'
df.rename(columns={'Name': 'Full Name', 'Age': 'Age in Years'}, inplace=True)

print(df)

Modify All Headers at Once

# Create a DataFrame
df = pd.DataFrame({
    'Name': ['Alice', 'Bob'],
    'Age': [25, 30]
})

# New column names
new_columns = ['Full Name', 'Age in Years']

# Assign the new column names to the DataFrame
df.columns = new_columns

print(df)

Apply a Function to Headers

# Create a DataFrame
df = pd.DataFrame({
    'Name': ['Alice', 'Bob'],
    'Age': [25, 30]
})

# Make all column names uppercase
df.columns = df.columns.str.upper()

print(df)

 

Remove Column from DataFrame in Pandas

import pandas as pd

# Create a DataFrame
df = pd.DataFrame({
    'Name': ['Alice', 'Bob', 'Charlie'],
    'Age': [25, 30, 35],
    'City': ['New York', 'Los Angeles', 'Chicago']
})

# Print the original DataFrame
print("Original DataFrame:")
print(df)

# Remove the 'Age' column
df = df.drop(columns=['Age'])

# Print the updated DataFrame
print("\nDataFrame After Removing 'Age' Column:")
print(df)

Remember to assign the result back to the DataFrame (or to a new variable) if you want to keep the change. If you just want to remove the column temporarily for a specific operation, you can use the inplace=True argument to modify the DataFrame in place:

df.drop(columns=['Age'], inplace=True)

You can remove a column from a DataFrame by its index

import pandas as pd

# Create a DataFrame
df = pd.DataFrame({
    'Name': ['Alice', 'Bob', 'Charlie'],
    'Age': [25, 30, 35],
    'City': ['New York', 'Los Angeles', 'Chicago']
})

# Print the original DataFrame
print("Original DataFrame:")
print(df)

# Index of the column to be removed
column_index = 1

# Get the name of the column at the specified index
column_name = df.columns[column_index]

# Drop the column by its name
df.drop(columns=[column_name], inplace=True)

# Print the updated DataFrame
print("\nDataFrame After Removing Column at Index 1:")
print(df)

Remove multiple columns at once from a DataFrame in Pandas

import pandas as pd

# Create a DataFrame
df = pd.DataFrame({
    'Name': ['Alice', 'Bob', 'Charlie'],
    'Age': [25, 30, 35],
    'City': ['New York', 'Los Angeles', 'Chicago'],
    'Country': ['USA', 'USA', 'USA']
})

# Print the original DataFrame
print("Original DataFrame:")
print(df)

# List of columns to be removed
columns_to_remove = ['Age', 'Country']

# Drop the specified columns
df.drop(columns=columns_to_remove, inplace=True)

# Print the updated DataFrame
print("\nDataFrame After Removing Columns:")
print(df)

If you want to remove multiple columns by their indices, you can use the following code:

# List of column indices to be removed
column_indices_to_remove = [1, 3]

# Get the names of the columns at the specified indices
columns_to_remove = df.columns[column_indices_to_remove]

# Drop the specified columns
df.drop(columns=columns_to_remove, inplace=True)

 

Combine two DataFrames in Pandas

Using concat to Stack DataFrames Vertically

If the two DataFrames have the same columns and you want to stack them vertically, you can use the pd.concat method:

import pandas as pd

# Define the first DataFrame
df1 = pd.DataFrame({
    'Name': ['Alice', 'Bob'],
    'Age': [25, 30],
    'City': ['New York', 'Los Angeles']
})

# Define the second DataFrame
df2 = pd.DataFrame({
    'Name': ['Charlie', 'David'],
    'Age': [35, 40],
    'City': ['Chicago', 'Houston']
})

# Concatenate the two DataFrames
result = pd.concat([df1, df2])

# Print the result
print(result)

Using merge to Join DataFrames Horizontally

If you want to join two DataFrames based on a common column (for example, an ID), you can use the pd.merge method:

# Define the first DataFrame
df1 = pd.DataFrame({
    'ID': [1, 2],
    'Name': ['Alice', 'Bob']
})

# Define the second DataFrame
df2 = pd.DataFrame({
    'ID': [1, 2],
    'Age': [25, 30]
})

# Merge the two DataFrames on the 'ID' column
result = pd.merge(df1, df2, on='ID')

# Print the result
print(result)