API Reference¶
Complete API documentation for PG Scheduler.
Core Classes¶
Scheduler¶
The main scheduler class that manages job execution:
- class pg_scheduler.Scheduler(db_pool: asyncpg.Pool, max_concurrent_jobs: int = 25, misfire_grace_time: int | None = 300, vacuum_config: VacuumConfig | None = None, vacuum_enabled: bool = True, batch_claim_limit: int = 10)[source]¶
Bases:
object- HEARTBEAT_THRESHOLD = 120¶
- LEASE_DURATION = 60¶
- WORKER_ID_LENGTH = 8¶
- __init__(db_pool: asyncpg.Pool, max_concurrent_jobs: int = 25, misfire_grace_time: int | None = 300, vacuum_config: VacuumConfig | None = None, vacuum_enabled: bool = True, batch_claim_limit: int = 10)[source]¶
Initialize the Scheduler with concurrency control and reliability features.
- Parameters:
db_pool – Connection to the PostgreSQL database.
max_concurrent_jobs – Maximum number of jobs to run concurrently
misfire_grace_time – Default seconds after execution_time before jobs expire. Set to None for no expiration. Can be overridden per-job.
vacuum_config – Configuration for job cleanup policies (uses defaults if None)
vacuum_enabled – Whether to enable automatic vacuum cleanup
batch_claim_limit – Maximum number of jobs to claim in a single batch (default 10)
- async schedule(func, *, execution_time: datetime, args: tuple = (), kwargs: dict = None, priority: JobPriority = <JobPriority.NORMAL: normal (db_value=5)>, max_retries: int = 0, job_id: str | None = None, conflict_resolution: ConflictResolution = ConflictResolution.RAISE, misfire_grace_time: int | None | object = <object object>) str[source]¶
Schedule an async I/O function to run at a specific time.
- Parameters:
func – The async function to schedule (must be async I/O only)
execution_time – datetime object specifying when to run the job
args – Tuple of positional arguments to pass to the function
kwargs – Dictionary of keyword arguments to pass to the function
priority – Job priority using JobPriority enum (NORMAL or CRITICAL)
max_retries – Maximum retry attempts for failed jobs
job_id – Optional custom job ID (auto-generated if not provided)
conflict_resolution – How to handle duplicate job_id (RAISE, IGNORE, REPLACE)
misfire_grace_time – Seconds after execution_time before job expires. - Not specified (default): uses scheduler’s misfire_grace_time - Explicit integer (e.g., 60): job expires after N seconds - Explicit None: job never expires
- Returns:
The job ID of the scheduled job
- Return type:
- async schedule_bulk(jobs: List[JobSpec], *, conflict_resolution: ConflictResolution = ConflictResolution.IGNORE, batch_size: int = 1000) List[str][source]¶
Schedule multiple jobs in a single bulk operation for improved performance.
- Parameters:
jobs – List of JobSpec instances defining the jobs to schedule
conflict_resolution – Strategy for ALL jobs (IGNORE recommended for bulk)
batch_size – Max jobs per database transaction (default 1000)
- Returns:
List of job IDs in same order as input. Jobs that failed to insert will have None in their position.
- async execute_job(job_row)[source]¶
Execute job with comprehensive error handling and state management
- async monitor_heartbeats()[source]¶
Monitor heartbeats and lease expiration with enhanced reliability
- async schedule_job(job_name, execution_time, task_data, priority: JobPriority = <JobPriority.NORMAL: normal (db_value=5)>, max_retries: int = 0, job_id: str | None = None, conflict_resolution: ConflictResolution = ConflictResolution.RAISE, misfire_grace_time: int | None = None) str[source]¶
Schedule a job by inserting it into the ‘scheduled_jobs’ table.
- Parameters:
job_name (str) – The name of the job to schedule.
execution_time (datetime) – The time at which the job should be executed.
task_data (dict) – Additional data required for the job execution.
priority – Job priority using JobPriority enum (NORMAL or CRITICAL)
max_retries – Maximum retry attempts for failed jobs
job_id – Optional custom job ID (auto-generated if not provided)
conflict_resolution – How to handle duplicate job_id (RAISE, IGNORE, REPLACE)
misfire_grace_time – Per-job misfire grace time in seconds. None = use scheduler default
- Returns:
The job ID of the scheduled job
- Return type:
- Raises:
ValueError – If the provided job_id already exists and conflict_resolution is RAISE
- async cancel_job(job_id: str) bool[source]¶
Cancel a scheduled job by setting its status to ‘cancelled’.
- async run_vacuum() Dict[str, int][source]¶
Manually trigger vacuum policies and return statistics.
- Returns:
Dict with counts of deleted jobs by status
- async get_vacuum_stats(days: int = 7) list[source]¶
Get vacuum statistics for the last N days (requires track_metrics=True).
- Parameters:
days – Number of days to look back
- Returns:
List of vacuum statistics records
- async get_total_vacuum_stats() dict[source]¶
Get aggregated vacuum statistics across all time and workers (requires track_metrics=True).
- Returns:
Dict with total counts and last vacuum run time
Periodic Decorator¶
The @periodic decorator for recurring jobs:
- pg_scheduler.periodic(every: timedelta | None = None, cron: str | None = None, timezone: str | ZoneInfo | None = None, use_advisory_lock: bool = False, priority: JobPriority = <JobPriority.NORMAL: normal (db_value=5)>, max_retries: int = 0, job_name: str | None = None, dedup_key: str | None = None, enabled: bool = True) Callable[source]¶
Decorator to mark an async function as a periodic job.
Supports two scheduling modes: 1. Interval-based: Use every parameter (e.g., every=timedelta(minutes=15)) 2. Cron-based: Use cron parameter (e.g., cron=”0 0 * * *” for daily at midnight)
Features: - Guarantees exactly one enqueue per window across many replicas (via dedup key) - Self-reschedules at the end of each run - Optional advisory-lock protection for exclusive execution - Timezone support for cron expressions
- Parameters:
every – Time interval between executions (timedelta). Mutually exclusive with cron.
cron –
Cron expression string (e.g., “0 0 * * *”). Mutually exclusive with every. Format: minute hour day month day_of_week .. rubric:: Examples
timezone – Timezone for cron scheduling (e.g., “America/New_York”, “Europe/London”). Can be a string or ZoneInfo object. Only valid with cron. Defaults to UTC if not specified.
use_advisory_lock – Use PostgreSQL advisory locks for exclusive execution across replicas
priority – Job priority (CRITICAL, HIGH, NORMAL, LOW)
max_retries – Maximum retry attempts for failed executions
job_name – Custom job name (auto-generated from function name if None)
dedup_key – Custom deduplication key (auto-generated if None)
enabled – Whether the periodic job is enabled
Examples
# Interval-based scheduling @periodic(every=timedelta(minutes=15)) async def cleanup_temp_files():
print(“Cleaning up temp files…”)
# Cron-based scheduling (daily at midnight UTC) @periodic(cron=”0 0 * * *”) async def daily_backup():
print(“Running daily backup…”)
# Cron with timezone (every Sunday at 3am EST) @periodic(cron=”0 3 * * SUN”, timezone=”America/New_York”) async def weekly_report():
print(“Generating weekly report…”)
# Cron with priority and retries @periodic(cron=”0 9 * * MON-FRI”, timezone=”Europe/London”,
priority=JobPriority.HIGH, max_retries=3)
- async def business_hours_task():
print(“Running business hours task…”)
- Raises:
ValueError – If neither every nor cron is specified, or both are specified
ValueError – If timezone is specified without cron
ImportError – If cron is specified but croniter is not installed
TypeError – If decorated function is not async
Enums¶
JobPriority¶
- class pg_scheduler.JobPriority(value)[source]¶
Bases:
EnumJob priority levels for scheduling.
Lower priority numbers indicate higher priority (jobs execute first). Jobs are processed in ascending priority order: 1 → 3 → 5 → 8.
- CRITICAL = 'critical'¶
- HIGH = 'high'¶
- NORMAL = 'normal'¶
- LOW = 'low'¶
- property db_value: int¶
Get the database integer value for this priority level.
- Returns:
Database priority value (1, 3, 5, or 8)
- Return type:
Note
Lower numbers = higher priority. Used for ORDER BY in SQL queries.
- classmethod from_db_value(db_value: int) JobPriority[source]¶
Convert a database integer value back to a JobPriority enum.
- Parameters:
db_value – The integer priority value from the database (1, 3, 5, or 8)
- Returns:
The corresponding priority enum member
- Return type:
Note
If an unknown value is provided, defaults to NORMAL.
ConflictResolution¶
VacuumTrigger¶
Configuration Classes¶
VacuumPolicy¶
- class pg_scheduler.VacuumPolicy(trigger: VacuumTrigger, days: int | None = None, keep_count: int | None = None)[source]¶
Bases:
objectConfiguration for a vacuum policy
- trigger: VacuumTrigger¶
- classmethod immediate() VacuumPolicy[source]¶
Delete immediately when job reaches this status
- classmethod after_days(days: int) VacuumPolicy[source]¶
Delete jobs after N days in this status
- classmethod keep_last(count: int) VacuumPolicy[source]¶
Keep only the last N jobs per job_name in this status
- classmethod never() VacuumPolicy[source]¶
Never automatically clean jobs in this status
VacuumConfig¶
- class pg_scheduler.VacuumConfig(completed: VacuumPolicy = None, failed: VacuumPolicy = None, cancelled: VacuumPolicy = None, interval_minutes: int = 60, track_metrics: bool = False)[source]¶
Bases:
objectComplete vacuum configuration for the scheduler
- completed: VacuumPolicy = None¶
- failed: VacuumPolicy = None¶
- cancelled: VacuumPolicy = None¶
- __init__(completed: VacuumPolicy = None, failed: VacuumPolicy = None, cancelled: VacuumPolicy = None, interval_minutes: int = 60, track_metrics: bool = False) None¶
Usage Examples¶
Basic Scheduler Usage¶
import asyncio
import asyncpg
from pg_scheduler import Scheduler
async def my_job(message: str):
print(f"Processing: {message}")
async def main():
db_pool = await asyncpg.create_pool(...)
scheduler = Scheduler(db_pool)
await scheduler.start()
# Schedule a job
await scheduler.schedule(
my_job,
execution_time=datetime.now(UTC) + timedelta(minutes=5),
args=("Hello World",)
)
Periodic Jobs¶
from pg_scheduler import periodic
from datetime import timedelta
@periodic(every=timedelta(hours=1))
async def hourly_task():
print("Running hourly task")