PG Scheduler Documentation

Welcome to PG Scheduler - a robust job scheduler built on PostgreSQL with async I/O support.

Overview

PG Scheduler provides a comprehensive job scheduling solution with:

  • 🔄 Periodic Jobs: Simple @periodic decorator for recurring tasks

  • 🔒 Deduplication: Guarantees exactly one execution per window across replicas

  • ⚡ Self-Rescheduling: Jobs automatically schedule their next execution

  • 🛡️ Advisory Locks: Optional PostgreSQL advisory locks for exclusive execution

  • 🎯 Priority Queues: Support for job priorities and retry logic

  • 🧹 Vacuum Policies: Automatic cleanup of completed jobs

  • 💪 Reliability: Graceful shutdown, error handling, and orphan recovery

Quick Start

Installation

pip install pg-scheduler

Basic Usage

import asyncio
import asyncpg
from datetime import datetime, timedelta, UTC
from pg_scheduler import Scheduler, periodic, JobPriority

# Simple periodic job
@periodic(every=timedelta(minutes=15))
async def cleanup_temp_files():
    print("🧹 Cleaning up temporary files...")
    await asyncio.sleep(1)
    print("✅ Cleanup completed")

# Manual job scheduling
async def send_email(recipient: str, subject: str):
    print(f"📧 Sending email to {recipient}: {subject}")
    await asyncio.sleep(1)
    print(f"✅ Email sent")

async def main():
    # Create database connection
    db_pool = await asyncpg.create_pool(
        user='scheduler',
        password='password',
        database='scheduler_db',
        host='localhost'
    )
    
    # Initialize scheduler
    scheduler = Scheduler(db_pool=db_pool)
    await scheduler.start()
    
    try:
        # Schedule a job
        job_id = await scheduler.schedule(
            send_email,
            execution_time=datetime.now(UTC) + timedelta(minutes=5),
            args=("user@example.com", "Welcome!"),
            priority=JobPriority.NORMAL
        )
        
        # Let it run
        await asyncio.sleep(300)
        
    finally:
        await scheduler.shutdown()
        await db_pool.close()

if __name__ == "__main__":
    asyncio.run(main())

Features

Periodic Jobs

The @periodic decorator makes recurring jobs simple:

from datetime import timedelta
from pg_scheduler import periodic, JobPriority

@periodic(every=timedelta(hours=1), priority=JobPriority.CRITICAL)
async def generate_reports():
    """Generate hourly reports"""
    # Your code here
    pass

Reliability

Built-in reliability features include:

  • Cross-replica deduplication - Same job won’t run twice

  • Heartbeat monitoring - Detect crashed workers

  • Orphan recovery - Clean up abandoned jobs

  • Graceful shutdown - Wait for jobs to complete

  • Retry logic - Automatic retry with exponential backoff

Project Overview Features

  • PostgreSQL backend - Reliable, ACID-compliant storage

  • Async/await support - Built for modern Python

  • Docker friendly - Easy containerization

  • Monitoring - Comprehensive logging and metrics

  • Scalable - Run multiple replicas safely

Indices and tables