Quick Start Guide¶
This guide will get you up and running with PG Scheduler in minutes.
Installation¶
Requirements¶
Python 3.9+
PostgreSQL 12+
Install PG Scheduler¶
pip install pg-scheduler
Database Setup¶
Create a PostgreSQL database for the scheduler:
CREATE DATABASE scheduler_db;
CREATE USER scheduler WITH PASSWORD 'scheduler123';
GRANT ALL PRIVILEGES ON DATABASE scheduler_db TO scheduler;
The scheduler will automatically create the required tables when it starts.
Basic Example¶
Here’s a complete example showing both periodic and manual job scheduling:
import asyncio
import asyncpg
from datetime import datetime, timedelta, UTC
from pg_scheduler import Scheduler, periodic, JobPriority
# Define a periodic job
@periodic(every=timedelta(minutes=15))
async def cleanup_temp_files():
"""Clean up temporary files every 15 minutes"""
print("🧹 Cleaning up temporary files...")
# Your cleanup logic here
await asyncio.sleep(2) # Simulate work
print("✅ Cleanup completed")
# Define a regular job function
async def send_welcome_email(user_email: str, user_name: str):
"""Send a welcome email to a new user"""
print(f"📧 Sending welcome email to {user_name} ({user_email})")
# Your email sending logic here
await asyncio.sleep(1) # Simulate email sending
print(f"✅ Welcome email sent to {user_name}")
async def main():
# Create database connection pool
db_pool = await asyncpg.create_pool(
user='scheduler',
password='scheduler123',
database='scheduler_db',
host='localhost',
port=5432
)
# Initialize scheduler
scheduler = Scheduler(
db_pool=db_pool,
max_concurrent_jobs=10,
misfire_grace_time=300 # 5 minutes
)
# Start the scheduler
await scheduler.start()
try:
print("🚀 Scheduler started!")
# Schedule a welcome email job
job_id = await scheduler.schedule(
send_welcome_email,
execution_time=datetime.now(UTC) + timedelta(minutes=1),
args=("user@example.com", "John Doe"),
priority=JobPriority.NORMAL,
max_retries=3
)
print(f"📋 Scheduled welcome email job: {job_id}")
# Check periodic jobs
periodic_jobs = scheduler.get_periodic_jobs()
print(f"📋 Found {len(periodic_jobs)} periodic jobs:")
for dedup_key, config in periodic_jobs.items():
status = scheduler.get_periodic_job_status(dedup_key)
print(f" • {status['job_name']}: every {status['interval']}s")
# Let the scheduler run for a while
print("⏱️ Running for 5 minutes... (Ctrl+C to stop)")
await asyncio.sleep(300)
except KeyboardInterrupt:
print("🛑 Shutting down...")
finally:
# Graceful shutdown
await scheduler.shutdown()
await db_pool.close()
print("👋 Goodbye!")
if __name__ == "__main__":
asyncio.run(main())
What Happens¶
When you run this example:
Database Setup: The scheduler creates necessary tables automatically
Periodic Job Registration: The
@periodicdecorator registers the cleanup jobManual Job Scheduling: The welcome email is scheduled for 1 minute from now
Job Execution: Both jobs will execute at their scheduled times
Self-Rescheduling: The periodic job automatically schedules its next run
Graceful Shutdown: All active jobs complete before shutdown
Next Steps¶
Read the User Guide for detailed features
Check out more Examples
Learn about Production Deployment
Explore the API Reference
Common Patterns¶
Priority Levels¶
# Critical priority - executes first
@periodic(every=timedelta(minutes=5), priority=JobPriority.CRITICAL)
async def monitor_system():
"""Critical system monitoring"""
pass
# High priority
@periodic(every=timedelta(minutes=15), priority=JobPriority.HIGH)
async def important_task():
"""Important but not critical"""
pass
# Normal priority (default)
@periodic(every=timedelta(hours=1), priority=JobPriority.NORMAL)
async def regular_task():
"""Regular background task"""
pass
# Low priority - executes last
@periodic(every=timedelta(hours=6), priority=JobPriority.LOW)
async def cleanup_task():
"""Low priority cleanup"""
pass
Jobs with Retries¶
@periodic(every=timedelta(hours=1), max_retries=3)
async def generate_report():
"""Generate reports with retry on failure"""
pass
Exclusive Jobs (Advisory Locks)¶
@periodic(every=timedelta(minutes=30), use_advisory_lock=True)
async def database_maintenance():
"""Maintenance that should only run on one worker"""
pass
Manual Job Management¶
# Get all periodic jobs
jobs = scheduler.get_periodic_jobs()
# Disable a job
scheduler.disable_periodic_job(dedup_key)
# Manually trigger a job
await scheduler.trigger_periodic_job(dedup_key)
# Check job status
status = scheduler.get_periodic_job_status(dedup_key)