Examples

Simple Job Scheduling

import asyncio
import asyncpg
from datetime import datetime, timedelta, UTC
from pg_scheduler import Scheduler, JobPriority

async def send_email(recipient: str, subject: str):
    print(f"Sending email to {recipient}: {subject}")
    await asyncio.sleep(1)
    print(f"Email sent to {recipient}")

async def main():
    db_pool = await asyncpg.create_pool(
        user="scheduler",
        password="password",
        database="scheduler_db",
        host="localhost",
    )

    scheduler = Scheduler(db_pool)
    await scheduler.start()

    try:
        await scheduler.schedule(
            send_email,
            execution_time=datetime.now(UTC) + timedelta(seconds=10),
            args=("user@example.com", "Welcome!"),
            priority=JobPriority.NORMAL,
        )

        await scheduler.schedule(
            send_email,
            execution_time=datetime.now(UTC) + timedelta(seconds=5),
            args=("admin@example.com", "System Alert"),
            priority=JobPriority.CRITICAL,
            max_retries=3,
        )

        await asyncio.sleep(30)
    finally:
        await scheduler.shutdown()
        await db_pool.close()

asyncio.run(main())

Periodic Jobs

from pg_scheduler import periodic, JobPriority
from datetime import timedelta

@periodic(every=timedelta(minutes=15))
async def cleanup_temp_files():
    print("Cleaning temporary files...")

@periodic(every=timedelta(hours=1), priority=JobPriority.CRITICAL, max_retries=3)
async def generate_reports():
    print("Generating reports...")

@periodic(every=timedelta(minutes=30), use_advisory_lock=True)
async def database_backup():
    print("Starting database backup...")

Cron Expressions

from pg_scheduler import periodic

@periodic(cron="0 0 * * *")
async def daily_midnight_job():
    print("Running at midnight")

@periodic(cron="*/15 * * * *")
async def every_fifteen_minutes():
    print("Running every 15 minutes")

@periodic(cron="0 9-17 * * MON-FRI", timezone="America/New_York")
async def business_hours_task():
    print("Running during business hours (Eastern)")

Bulk Scheduling

from pg_scheduler import Scheduler, JobSpec, JobPriority, ConflictResolution
from datetime import datetime, timedelta, UTC

async def send_notification(user_id: int):
    print(f"Notifying user {user_id}")

jobs = [
    JobSpec(
        func=send_notification,
        execution_time=datetime.now(UTC) + timedelta(seconds=i),
        args=(user_id,),
        priority=JobPriority.NORMAL,
    )
    for i, user_id in enumerate(range(1, 10001))
]

ids = await scheduler.schedule_bulk(
    jobs,
    conflict_resolution=ConflictResolution.IGNORE,
    batch_size=1000,
)
print(f"Scheduled {len(ids)} jobs")

Error Handling and Retries

import random
from pg_scheduler import periodic
from datetime import timedelta

@periodic(every=timedelta(minutes=5), max_retries=3)
async def flaky_job():
    if random.random() < 0.3:
        raise Exception("Random failure occurred")
    print("Job completed successfully")

@periodic(every=timedelta(minutes=10))
async def robust_job():
    try:
        await risky_operation()
    except Exception as e:
        print(f"Operation failed: {e}")
        await fallback_operation()

Job Management

async def manage_periodic_jobs(scheduler):
    jobs = scheduler.get_periodic_jobs()
    print(f"Found {len(jobs)} periodic jobs")

    for dedup_key, config in jobs.items():
        status = scheduler.get_periodic_job_status(dedup_key)
        print(f"Job: {status['job_name']}, Enabled: {status['enabled']}")

        # Disable a specific job
        if status["job_name"] == "cleanup_temp_files":
            scheduler.disable_periodic_job(dedup_key)

        # Manually trigger a job
        if status["job_name"] == "generate_reports":
            job_id = await scheduler.trigger_periodic_job(dedup_key)
            print(f"Manually triggered: {job_id}")

Conflict Resolution

from pg_scheduler import ConflictResolution

# Raise error if job_id already exists (default)
await scheduler.schedule(
    my_function,
    execution_time=run_time,
    job_id="custom-id",
    conflict_resolution=ConflictResolution.RAISE,
)

# Silently skip if job_id already exists
await scheduler.schedule(
    my_function,
    execution_time=run_time,
    job_id="custom-id",
    conflict_resolution=ConflictResolution.IGNORE,
)

# Overwrite the existing job
await scheduler.schedule(
    my_function,
    execution_time=run_time,
    job_id="custom-id",
    conflict_resolution=ConflictResolution.REPLACE,
)

FastAPI Integration

from contextlib import asynccontextmanager
from datetime import datetime, timedelta

from fastapi import FastAPI
import asyncpg
import asyncio

from pg_scheduler import Scheduler


@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.db_pool = await asyncpg.create_pool(
        host="localhost",
        port=5432,
        user="scheduler",
        password="scheduler123",
        database="scheduler_db",
    )
    app.state.scheduler = Scheduler(
        app.state.db_pool,
        max_concurrent_jobs=20,
        misfire_grace_time=120,
    )
    await app.state.scheduler.start()
    yield
    await app.state.scheduler.shutdown()
    await app.state.db_pool.close()


app = FastAPI(lifespan=lifespan)


async def process_order(order_id: str):
    await asyncio.sleep(1)
    print(f"Order {order_id} processed")


@app.post("/orders/{order_id}/process")
async def schedule_order(order_id: str):
    await app.state.scheduler.schedule(
        process_order,
        execution_time=datetime.now() + timedelta(seconds=10),
        args=(order_id,),
    )
    return {"status": "scheduled"}

Production Configuration

import os
from pg_scheduler import (
    Scheduler,
    PollingConfig,
    VacuumConfig,
    VacuumPolicy,
)

MAX_JOBS = int(os.getenv("MAX_CONCURRENT_JOBS", "25"))
MISFIRE_GRACE = int(os.getenv("MISFIRE_GRACE_TIME", "300"))
VACUUM_INTERVAL = int(os.getenv("VACUUM_INTERVAL_MINUTES", "60"))

scheduler = Scheduler(
    db_pool=db_pool,
    max_concurrent_jobs=MAX_JOBS,
    misfire_grace_time=MISFIRE_GRACE,
    batch_claim_limit=30,
    polling_config=PollingConfig(
        min_interval=0.02,
        max_interval=2.0,
    ),
    vacuum_enabled=True,
    vacuum_config=VacuumConfig(
        completed=VacuumPolicy.after_days(1),
        failed=VacuumPolicy.after_days(7),
        cancelled=VacuumPolicy.after_days(3),
        interval_minutes=VACUUM_INTERVAL,
    ),
)

Docker Compose

services:
  scheduler:
    image: my-scheduler:latest
    environment:
      - MAX_CONCURRENT_JOBS=50
      - MISFIRE_GRACE_TIME=300
      - VACUUM_INTERVAL_MINUTES=30
      - DATABASE_URL=postgresql://user:pass@db:5432/scheduler_db