Quick Start¶
Requirements¶
Python 3.9+
PostgreSQL 12+
Installation¶
pip install pg-scheduler
Database Setup¶
Create a PostgreSQL database for the scheduler:
CREATE DATABASE scheduler_db;
CREATE USER scheduler WITH PASSWORD 'scheduler123';
GRANT ALL PRIVILEGES ON DATABASE scheduler_db TO scheduler;
The scheduler creates the scheduled_jobs table automatically on first start.
Standalone Example¶
import asyncio
import asyncpg
from datetime import datetime, timedelta, UTC
from pg_scheduler import Scheduler
async def send_email(recipient: str, subject: str):
print(f"Sending email to {recipient}: {subject}")
await asyncio.sleep(1)
print(f"Email sent to {recipient}")
async def main():
db_pool = await asyncpg.create_pool(
user="scheduler",
password="scheduler123",
database="scheduler_db",
host="localhost",
port=5432,
)
scheduler = Scheduler(db_pool=db_pool)
await scheduler.start()
try:
job_id = await scheduler.schedule(
send_email,
execution_time=datetime.now(UTC) + timedelta(seconds=10),
args=("user@example.com", "Welcome!"),
)
print(f"Scheduled job: {job_id}")
# Keep the process alive so the scheduler can run
while True:
await asyncio.sleep(1)
finally:
await scheduler.shutdown()
await db_pool.close()
if __name__ == "__main__":
asyncio.run(main())
FastAPI Example¶
A more realistic setup using FastAPI’s lifespan protocol to start and stop the scheduler alongside the web application:
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from fastapi import FastAPI
import asyncpg
import asyncio
import logging
from pg_scheduler import Scheduler, periodic
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Periodic tasks are registered at import time via the decorator.
# The scheduler picks them up automatically when it starts.
@periodic(every=timedelta(minutes=5))
async def cleanup_stale_carts():
logger.info("Cleaning up stale shopping carts")
await asyncio.sleep(0.5)
logger.info("Stale carts cleaned")
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.db_pool = await asyncpg.create_pool(
host="localhost",
port=5432,
user="scheduler",
password="scheduler123",
database="scheduler_db",
)
app.state.scheduler = Scheduler(
app.state.db_pool,
max_concurrent_jobs=20,
misfire_grace_time=120,
)
await app.state.scheduler.start()
yield
await app.state.scheduler.shutdown()
await app.state.db_pool.close()
app = FastAPI(lifespan=lifespan)
async def process_order(order_id: str, delay: float = 1.0):
logger.info(f"Processing order {order_id}")
await asyncio.sleep(delay)
logger.info(f"Order {order_id} processed")
@app.post("/orders/{order_id}/process")
async def schedule_order(order_id: str):
await app.state.scheduler.schedule(
process_order,
execution_time=datetime.now() + timedelta(seconds=10),
args=(order_id,),
)
return {"status": "scheduled"}
Run it with:
pip install pg-scheduler[examples]
uvicorn myapp:app --host 0.0.0.0 --port 8000
Next Steps¶
User Guide – configuration, misfire grace time, priorities, conflict resolution.
Performance Tuning – the knobs that matter and how to size them for your workload.
API Reference – full class and function documentation.
Examples – periodic jobs, bulk scheduling, production configuration.