API Reference

Complete API documentation for PG Scheduler.

Core Classes

Scheduler

The main scheduler class that manages job execution:

class pg_scheduler.Scheduler(db_pool: asyncpg.Pool, max_concurrent_jobs: int = 25, misfire_grace_time: int | None = 300, vacuum_config: VacuumConfig | None = None, vacuum_enabled: bool = True, batch_claim_limit: int = 10)[source]

Bases: object

HEARTBEAT_THRESHOLD = 120
LEASE_DURATION = 60
WORKER_ID_LENGTH = 8
__init__(db_pool: asyncpg.Pool, max_concurrent_jobs: int = 25, misfire_grace_time: int | None = 300, vacuum_config: VacuumConfig | None = None, vacuum_enabled: bool = True, batch_claim_limit: int = 10)[source]

Initialize the Scheduler with concurrency control and reliability features.

Parameters:
  • db_pool – Connection to the PostgreSQL database.

  • max_concurrent_jobs – Maximum number of jobs to run concurrently

  • misfire_grace_time – Default seconds after execution_time before jobs expire. Set to None for no expiration. Can be overridden per-job.

  • vacuum_config – Configuration for job cleanup policies (uses defaults if None)

  • vacuum_enabled – Whether to enable automatic vacuum cleanup

  • batch_claim_limit – Maximum number of jobs to claim in a single batch (default 10)

async start()[source]

Start the scheduler with reliability features

async shutdown()[source]

Shutdown the scheduler gracefully with job completion

async initialize_db()[source]

Initialize database with worker tracking and reliability features

async recover_orphaned_jobs()[source]

Recover jobs that were running when workers crashed

async periodic_orphan_recovery()[source]

Periodically check for and recover orphaned jobs

async schedule(func, *, execution_time: datetime, args: tuple = (), kwargs: dict = None, priority: JobPriority = <JobPriority.NORMAL: normal (db_value=5)>, max_retries: int = 0, job_id: str | None = None, conflict_resolution: ConflictResolution = ConflictResolution.RAISE, misfire_grace_time: int | None | object = <object object>) str[source]

Schedule an async I/O function to run at a specific time.

Parameters:
  • func – The async function to schedule (must be async I/O only)

  • execution_time – datetime object specifying when to run the job

  • args – Tuple of positional arguments to pass to the function

  • kwargs – Dictionary of keyword arguments to pass to the function

  • priority – Job priority using JobPriority enum (NORMAL or CRITICAL)

  • max_retries – Maximum retry attempts for failed jobs

  • job_id – Optional custom job ID (auto-generated if not provided)

  • conflict_resolution – How to handle duplicate job_id (RAISE, IGNORE, REPLACE)

  • misfire_grace_time – Seconds after execution_time before job expires. - Not specified (default): uses scheduler’s misfire_grace_time - Explicit integer (e.g., 60): job expires after N seconds - Explicit None: job never expires

Returns:

The job ID of the scheduled job

Return type:

str

async schedule_bulk(jobs: List[JobSpec], *, conflict_resolution: ConflictResolution = ConflictResolution.IGNORE, batch_size: int = 1000) List[str][source]

Schedule multiple jobs in a single bulk operation for improved performance.

Parameters:
  • jobs – List of JobSpec instances defining the jobs to schedule

  • conflict_resolution – Strategy for ALL jobs (IGNORE recommended for bulk)

  • batch_size – Max jobs per database transaction (default 1000)

Returns:

List of job IDs in same order as input. Jobs that failed to insert will have None in their position.

async listen_for_jobs()[source]

Listen for jobs with bulk claiming

async execute_job_with_concurrency_control(job_row)[source]

Execute job with semaphore control

async execute_job(job_row)[source]

Execute job with comprehensive error handling and state management

async load_task_functions()[source]

Load task functions with error handling

async monitor_heartbeats()[source]

Monitor heartbeats and lease expiration with enhanced reliability

async send_heartbeat(job_id)[source]

Send heartbeats with lease renewal

async schedule_job(job_name, execution_time, task_data, priority: JobPriority = <JobPriority.NORMAL: normal (db_value=5)>, max_retries: int = 0, job_id: str | None = None, conflict_resolution: ConflictResolution = ConflictResolution.RAISE, misfire_grace_time: int | None = None) str[source]

Schedule a job by inserting it into the ‘scheduled_jobs’ table.

Parameters:
  • job_name (str) – The name of the job to schedule.

  • execution_time (datetime) – The time at which the job should be executed.

  • task_data (dict) – Additional data required for the job execution.

  • priority – Job priority using JobPriority enum (NORMAL or CRITICAL)

  • max_retries – Maximum retry attempts for failed jobs

  • job_id – Optional custom job ID (auto-generated if not provided)

  • conflict_resolution – How to handle duplicate job_id (RAISE, IGNORE, REPLACE)

  • misfire_grace_time – Per-job misfire grace time in seconds. None = use scheduler default

Returns:

The job ID of the scheduled job

Return type:

str

Raises:

ValueError – If the provided job_id already exists and conflict_resolution is RAISE

async cancel_job(job_id: str) bool[source]

Cancel a scheduled job by setting its status to ‘cancelled’.

Parameters:

job_id (str) – The ID of the job to cancel

Returns:

True if the job was successfully cancelled, False otherwise

Return type:

bool

async update_job_status(job_id, status)[source]

Atomically update the status of a job.

async run_vacuum() Dict[str, int][source]

Manually trigger vacuum policies and return statistics.

Returns:

Dict with counts of deleted jobs by status

async get_vacuum_stats(days: int = 7) list[source]

Get vacuum statistics for the last N days (requires track_metrics=True).

Parameters:

days – Number of days to look back

Returns:

List of vacuum statistics records

async get_total_vacuum_stats() dict[source]

Get aggregated vacuum statistics across all time and workers (requires track_metrics=True).

Returns:

Dict with total counts and last vacuum run time

get_periodic_jobs() Dict[str, PeriodicJobConfig][source]

Get all registered periodic jobs

enable_periodic_job(dedup_key: str) bool[source]

Enable a specific periodic job

disable_periodic_job(dedup_key: str) bool[source]

Disable a specific periodic job

async trigger_periodic_job(dedup_key: str) str | None[source]

Manually trigger a periodic job execution

get_periodic_job_status(dedup_key: str) Dict[str, Any] | None[source]

Get status information for a periodic job

Periodic Decorator

The @periodic decorator for recurring jobs:

pg_scheduler.periodic(every: timedelta | None = None, cron: str | None = None, timezone: str | ZoneInfo | None = None, use_advisory_lock: bool = False, priority: JobPriority = <JobPriority.NORMAL: normal (db_value=5)>, max_retries: int = 0, job_name: str | None = None, dedup_key: str | None = None, enabled: bool = True) Callable[source]

Decorator to mark an async function as a periodic job.

Supports two scheduling modes: 1. Interval-based: Use every parameter (e.g., every=timedelta(minutes=15)) 2. Cron-based: Use cron parameter (e.g., cron=”0 0 * * *” for daily at midnight)

Features: - Guarantees exactly one enqueue per window across many replicas (via dedup key) - Self-reschedules at the end of each run - Optional advisory-lock protection for exclusive execution - Timezone support for cron expressions

Parameters:
  • every – Time interval between executions (timedelta). Mutually exclusive with cron.

  • cron

    Cron expression string (e.g., “0 0 * * *”). Mutually exclusive with every. Format: minute hour day month day_of_week .. rubric:: Examples

    • ”0 0 * * *” = daily at midnight

    • ”0 0 * * SUN” = every Sunday at midnight

    • */15 * * * *” = every 15 minutes

    • ”0 9-17 * * MON-FRI” = every hour 9am-5pm on weekdays

  • timezone – Timezone for cron scheduling (e.g., “America/New_York”, “Europe/London”). Can be a string or ZoneInfo object. Only valid with cron. Defaults to UTC if not specified.

  • use_advisory_lock – Use PostgreSQL advisory locks for exclusive execution across replicas

  • priority – Job priority (CRITICAL, HIGH, NORMAL, LOW)

  • max_retries – Maximum retry attempts for failed executions

  • job_name – Custom job name (auto-generated from function name if None)

  • dedup_key – Custom deduplication key (auto-generated if None)

  • enabled – Whether the periodic job is enabled

Examples

# Interval-based scheduling @periodic(every=timedelta(minutes=15)) async def cleanup_temp_files():

print(“Cleaning up temp files…”)

# Cron-based scheduling (daily at midnight UTC) @periodic(cron=”0 0 * * *”) async def daily_backup():

print(“Running daily backup…”)

# Cron with timezone (every Sunday at 3am EST) @periodic(cron=”0 3 * * SUN”, timezone=”America/New_York”) async def weekly_report():

print(“Generating weekly report…”)

# Cron with priority and retries @periodic(cron=”0 9 * * MON-FRI”, timezone=”Europe/London”,

priority=JobPriority.HIGH, max_retries=3)

async def business_hours_task():

print(“Running business hours task…”)

Raises:
  • ValueError – If neither every nor cron is specified, or both are specified

  • ValueError – If timezone is specified without cron

  • ImportError – If cron is specified but croniter is not installed

  • TypeError – If decorated function is not async

Enums

JobPriority

class pg_scheduler.JobPriority(value)[source]

Bases: Enum

Job priority levels for scheduling.

Lower priority numbers indicate higher priority (jobs execute first). Jobs are processed in ascending priority order: 1 → 3 → 5 → 8.

CRITICAL = 'critical'
HIGH = 'high'
NORMAL = 'normal'
LOW = 'low'
property db_value: int

Get the database integer value for this priority level.

Returns:

Database priority value (1, 3, 5, or 8)

Return type:

int

Note

Lower numbers = higher priority. Used for ORDER BY in SQL queries.

classmethod from_db_value(db_value: int) JobPriority[source]

Convert a database integer value back to a JobPriority enum.

Parameters:

db_value – The integer priority value from the database (1, 3, 5, or 8)

Returns:

The corresponding priority enum member

Return type:

JobPriority

Note

If an unknown value is provided, defaults to NORMAL.

__str__() str[source]

String representation of the priority.

Returns:

The priority label (e.g., “critical”, “normal”)

Return type:

str

__repr__() str[source]

Developer-friendly representation of the priority.

Returns:

Detailed representation with name, label, and db_value

Return type:

str

ConflictResolution

class pg_scheduler.ConflictResolution(value)[source]

Bases: Enum

Strategies for handling duplicate job_id conflicts

RAISE = 'raise'
IGNORE = 'ignore'
REPLACE = 'replace'

VacuumTrigger

class pg_scheduler.VacuumTrigger(value)[source]

Bases: Enum

Vacuum policy trigger types

IMMEDIATE = 'immediate'
TIME_BASED = 'time_based'
COUNT_BASED = 'count_based'
NEVER = 'never'

Configuration Classes

VacuumPolicy

class pg_scheduler.VacuumPolicy(trigger: VacuumTrigger, days: int | None = None, keep_count: int | None = None)[source]

Bases: object

Configuration for a vacuum policy

trigger: VacuumTrigger
days: int | None = None
keep_count: int | None = None
classmethod immediate() VacuumPolicy[source]

Delete immediately when job reaches this status

classmethod after_days(days: int) VacuumPolicy[source]

Delete jobs after N days in this status

classmethod keep_last(count: int) VacuumPolicy[source]

Keep only the last N jobs per job_name in this status

classmethod never() VacuumPolicy[source]

Never automatically clean jobs in this status

__init__(trigger: VacuumTrigger, days: int | None = None, keep_count: int | None = None) None

VacuumConfig

class pg_scheduler.VacuumConfig(completed: VacuumPolicy = None, failed: VacuumPolicy = None, cancelled: VacuumPolicy = None, interval_minutes: int = 60, track_metrics: bool = False)[source]

Bases: object

Complete vacuum configuration for the scheduler

completed: VacuumPolicy = None
failed: VacuumPolicy = None
cancelled: VacuumPolicy = None
interval_minutes: int = 60
track_metrics: bool = False
__post_init__()[source]

Set sensible defaults for None policies

__init__(completed: VacuumPolicy = None, failed: VacuumPolicy = None, cancelled: VacuumPolicy = None, interval_minutes: int = 60, track_metrics: bool = False) None

Usage Examples

Basic Scheduler Usage

import asyncio
import asyncpg
from pg_scheduler import Scheduler

async def my_job(message: str):
    print(f"Processing: {message}")

async def main():
    db_pool = await asyncpg.create_pool(...)
    scheduler = Scheduler(db_pool)
    await scheduler.start()
    
    # Schedule a job
    await scheduler.schedule(
        my_job,
        execution_time=datetime.now(UTC) + timedelta(minutes=5),
        args=("Hello World",)
    )

Periodic Jobs

from pg_scheduler import periodic
from datetime import timedelta

@periodic(every=timedelta(hours=1))
async def hourly_task():
    print("Running hourly task")