Examples

Comprehensive examples showing different use cases for PG Scheduler.

Quick Examples

Simple Job Scheduling

import asyncio
import asyncpg
from datetime import datetime, timedelta, UTC
from pg_scheduler import Scheduler, JobPriority

async def send_email(recipient: str, subject: str):
    print(f"📧 Sending email to {recipient}: {subject}")
    await asyncio.sleep(1)  # Simulate sending
    print(f"✅ Email sent to {recipient}")

async def main():
    db_pool = await asyncpg.create_pool(
        user='scheduler',
        password='password',
        database='scheduler_db',
        host='localhost'
    )
    
    scheduler = Scheduler(db_pool)
    await scheduler.start()
    
    try:
        # Schedule immediate job
        await scheduler.schedule(
            send_email,
            execution_time=datetime.now(UTC) + timedelta(seconds=10),
            args=("user@example.com", "Welcome!"),
            priority=JobPriority.NORMAL
        )
        
        # Schedule high priority job
        await scheduler.schedule(
            send_email,
            execution_time=datetime.now(UTC) + timedelta(seconds=5),
            args=("admin@example.com", "System Alert"),
            priority=JobPriority.CRITICAL,
            max_retries=3
        )
        
        await asyncio.sleep(30)
        
    finally:
        await scheduler.shutdown()
        await db_pool.close()

asyncio.run(main())

Periodic Jobs

from pg_scheduler import periodic, JobPriority
from datetime import timedelta

@periodic(every=timedelta(minutes=15))
async def cleanup_temp_files():
    """Clean temporary files every 15 minutes"""
    print("🧹 Cleaning temporary files...")
    # Your cleanup logic
    print("✅ Cleanup complete")

@periodic(
    every=timedelta(hours=1),
    priority=JobPriority.CRITICAL,
    max_retries=3
)
async def generate_reports():
    """Generate hourly reports with retries"""
    print("📊 Generating reports...")
    # Your report logic
    print("✅ Reports generated")

@periodic(
    every=timedelta(minutes=30),
    use_advisory_lock=True
)
async def database_backup():
    """Exclusive database backup"""
    print("💾 Starting database backup...")
    # Your backup logic
    print("✅ Backup complete")

Error Handling and Retries

import random
from pg_scheduler import periodic
from datetime import timedelta

@periodic(every=timedelta(minutes=5), max_retries=3)
async def flaky_job():
    """Job that sometimes fails"""
    if random.random() < 0.3:  # 30% failure rate
        raise Exception("Random failure occurred")
    
    print("✅ Job completed successfully")

@periodic(every=timedelta(minutes=10))
async def robust_job():
    """Job with built-in error handling"""
    try:
        # Your potentially failing logic
        await risky_operation()
        print("✅ Operation completed")
    except Exception as e:
        print(f"⚠️ Operation failed: {e}")
        # Handle the error gracefully
        await fallback_operation()

Job Management

async def manage_periodic_jobs(scheduler):
    """Example of managing periodic jobs at runtime"""
    
    # Get all periodic jobs
    jobs = scheduler.get_periodic_jobs()
    print(f"Found {len(jobs)} periodic jobs")
    
    for dedup_key, config in jobs.items():
        status = scheduler.get_periodic_job_status(dedup_key)
        print(f"Job: {status['job_name']}")
        print(f"  Interval: {status['interval']}s")
        print(f"  Enabled: {status['enabled']}")
        print(f"  Priority: {status['priority']}")
        
        # Disable a specific job
        if status['job_name'] == 'cleanup_temp_files':
            scheduler.disable_periodic_job(dedup_key)
            print("  → Disabled cleanup job")
        
        # Manually trigger a job
        if status['job_name'] == 'generate_reports':
            job_id = await scheduler.trigger_periodic_job(dedup_key)
            print(f"  → Manually triggered: {job_id}")

Production Configuration

from pg_scheduler import (
    Scheduler, 
    VacuumConfig, 
    VacuumPolicy,
    VacuumTrigger
)

# Configure vacuum policies
vacuum_config = VacuumConfig(
    completed=VacuumPolicy.after_days(1),     # Clean completed jobs after 1 day
    failed=VacuumPolicy.after_days(7),        # Keep failed jobs for 7 days
    cancelled=VacuumPolicy.after_days(3),     # Clean cancelled jobs after 3 days
    interval_minutes=60,                      # Run vacuum every hour
    track_metrics=True                        # Store vacuum statistics
)

# Production scheduler configuration
scheduler = Scheduler(
    db_pool=db_pool,
    max_concurrent_jobs=50,      # Higher concurrency for production
    misfire_grace_time=300,      # 5 minute grace period
    vacuum_config=vacuum_config,
    vacuum_enabled=True
)

Integration Examples

FastAPI Integration

from fastapi import FastAPI, BackgroundTasks
from pg_scheduler import Scheduler
import asyncpg

app = FastAPI()
scheduler = None

@app.on_event("startup")
async def startup_event():
    global scheduler
    db_pool = await asyncpg.create_pool(...)
    scheduler = Scheduler(db_pool)
    await scheduler.start()

@app.on_event("shutdown")
async def shutdown_event():
    if scheduler:
        await scheduler.shutdown()

@app.post("/schedule-email")
async def schedule_email(recipient: str, subject: str):
    job_id = await scheduler.schedule(
        send_email,
        execution_time=datetime.now(UTC) + timedelta(minutes=5),
        args=(recipient, subject)
    )
    return {"job_id": job_id}

Django Integration

# In your Django app
from django.core.management.base import BaseCommand
from pg_scheduler import Scheduler, periodic
import asyncpg
import asyncio

@periodic(every=timedelta(hours=1))
async def sync_user_data():
    """Sync user data periodically"""
    # Your Django model operations
    pass

class Command(BaseCommand):
    def handle(self, *args, **options):
        asyncio.run(self.run_scheduler())
    
    async def run_scheduler(self):
        db_pool = await asyncpg.create_pool(...)
        scheduler = Scheduler(db_pool)
        await scheduler.start()
        
        try:
            # Keep running
            while True:
                await asyncio.sleep(60)
        finally:
            await scheduler.shutdown()