Scheduler¶
The central class that manages the polling loop, job execution, heartbeats, orphan recovery, and vacuum.
- class pg_scheduler.Scheduler(db_pool: asyncpg.Pool, max_concurrent_jobs: int = 25, misfire_grace_time: int | None = 300, vacuum_config: VacuumConfig | None = None, vacuum_enabled: bool = True, batch_claim_limit: int = 10, polling_config: PollingConfig | None = None)[source]¶
Bases:
object- HEARTBEAT_THRESHOLD = 120¶
- LEASE_DURATION = 60¶
- WORKER_ID_LENGTH = 8¶
- __init__(db_pool: asyncpg.Pool, max_concurrent_jobs: int = 25, misfire_grace_time: int | None = 300, vacuum_config: VacuumConfig | None = None, vacuum_enabled: bool = True, batch_claim_limit: int = 10, polling_config: PollingConfig | None = None)[source]¶
- async schedule(func, *, execution_time: datetime, args: tuple = (), kwargs: dict = None, priority: JobPriority = <JobPriority.NORMAL: normal (db_value=5)>, max_retries: int = 0, job_id: str | None = None, conflict_resolution: ConflictResolution = ConflictResolution.RAISE, misfire_grace_time: int | None | object = <object object>) str[source]¶
Schedule an async I/O function to run at a specific time.
- Parameters:
func – The async function to schedule (must be async I/O only)
execution_time – datetime object specifying when to run the job
args – Tuple of positional arguments to pass to the function
kwargs – Dictionary of keyword arguments to pass to the function
priority – Job priority using JobPriority enum (NORMAL or CRITICAL)
max_retries – Maximum retry attempts for failed jobs
job_id – Optional custom job ID (auto-generated if not provided)
conflict_resolution – How to handle duplicate job_id (RAISE, IGNORE, REPLACE)
misfire_grace_time – Seconds after execution_time before job expires. - Not specified (default): uses scheduler’s misfire_grace_time - Explicit integer (e.g., 60): job expires after N seconds - Explicit None: job never expires
- Returns:
The job ID of the scheduled job
- Return type:
- async schedule_bulk(jobs: List[JobSpec], *, conflict_resolution: ConflictResolution = ConflictResolution.IGNORE, batch_size: int = 1000) List[str][source]¶
Schedule multiple jobs in a single bulk operation for improved performance.
- Parameters:
jobs – List of JobSpec instances defining the jobs to schedule
conflict_resolution – Strategy for ALL jobs (IGNORE recommended for bulk)
batch_size – Max jobs per database transaction (default 1000)
- Returns:
List of job IDs in same order as input. Jobs that failed to insert will have None in their position.
- async execute_job(job_row)[source]¶
Execute job with comprehensive error handling and state management
- async monitor_heartbeats()[source]¶
Monitor heartbeats and lease expiration with enhanced reliability
- async schedule_job(job_name, execution_time, task_data, priority: JobPriority = <JobPriority.NORMAL: normal (db_value=5)>, max_retries: int = 0, job_id: str | None = None, conflict_resolution: ConflictResolution = ConflictResolution.RAISE, misfire_grace_time: int | None = None) str[source]¶
Schedule a job by inserting it into the ‘scheduled_jobs’ table.
- Parameters:
job_name (str) – The name of the job to schedule.
execution_time (datetime) – The time at which the job should be executed.
task_data (dict) – Additional data required for the job execution.
priority – Job priority using JobPriority enum (NORMAL or CRITICAL)
max_retries – Maximum retry attempts for failed jobs
job_id – Optional custom job ID (auto-generated if not provided)
conflict_resolution – How to handle duplicate job_id (RAISE, IGNORE, REPLACE)
misfire_grace_time – Per-job misfire grace time in seconds. None = use scheduler default
- Returns:
The job ID of the scheduled job
- Return type:
- Raises:
ValueError – If the provided job_id already exists and conflict_resolution is RAISE
- async cancel_job(job_id: str) bool[source]¶
Cancel a scheduled job by setting its status to ‘cancelled’.
- async run_vacuum() Dict[str, int][source]¶
Manually trigger vacuum policies and return statistics.
- Returns:
Dict with counts of deleted jobs by status
- async get_vacuum_stats(days: int = 7) list[source]¶
Get vacuum statistics for the last N days (requires track_metrics=True).
- Parameters:
days – Number of days to look back
- Returns:
List of vacuum statistics records
- async get_total_vacuum_stats() dict[source]¶
Get aggregated vacuum statistics across all time and workers (requires track_metrics=True).
- Returns:
Dict with total counts and last vacuum run time
- get_periodic_jobs() Dict[str, PeriodicJobConfig][source]¶
Get all registered periodic jobs