Scheduler

The central class that manages the polling loop, job execution, heartbeats, orphan recovery, and vacuum.

class pg_scheduler.Scheduler(db_pool: asyncpg.Pool, max_concurrent_jobs: int = 25, misfire_grace_time: int | None = 300, vacuum_config: VacuumConfig | None = None, vacuum_enabled: bool = True, batch_claim_limit: int = 10, polling_config: PollingConfig | None = None)[source]

Bases: object

HEARTBEAT_THRESHOLD = 120
LEASE_DURATION = 60
WORKER_ID_LENGTH = 8
__init__(db_pool: asyncpg.Pool, max_concurrent_jobs: int = 25, misfire_grace_time: int | None = 300, vacuum_config: VacuumConfig | None = None, vacuum_enabled: bool = True, batch_claim_limit: int = 10, polling_config: PollingConfig | None = None)[source]
async start()[source]

Start the scheduler with reliability features

async shutdown()[source]

Shutdown the scheduler gracefully with job completion

async initialize_db()[source]

Initialize database with worker tracking and reliability features

async recover_orphaned_jobs()[source]

Recover jobs that were running when workers crashed

async periodic_orphan_recovery()[source]

Periodically check for and recover orphaned jobs

async schedule(func, *, execution_time: datetime, args: tuple = (), kwargs: dict = None, priority: JobPriority = <JobPriority.NORMAL: normal (db_value=5)>, max_retries: int = 0, job_id: str | None = None, conflict_resolution: ConflictResolution = ConflictResolution.RAISE, misfire_grace_time: int | None | object = <object object>) str[source]

Schedule an async I/O function to run at a specific time.

Parameters:
  • func – The async function to schedule (must be async I/O only)

  • execution_time – datetime object specifying when to run the job

  • args – Tuple of positional arguments to pass to the function

  • kwargs – Dictionary of keyword arguments to pass to the function

  • priority – Job priority using JobPriority enum (NORMAL or CRITICAL)

  • max_retries – Maximum retry attempts for failed jobs

  • job_id – Optional custom job ID (auto-generated if not provided)

  • conflict_resolution – How to handle duplicate job_id (RAISE, IGNORE, REPLACE)

  • misfire_grace_time – Seconds after execution_time before job expires. - Not specified (default): uses scheduler’s misfire_grace_time - Explicit integer (e.g., 60): job expires after N seconds - Explicit None: job never expires

Returns:

The job ID of the scheduled job

Return type:

str

async schedule_bulk(jobs: List[JobSpec], *, conflict_resolution: ConflictResolution = ConflictResolution.IGNORE, batch_size: int = 1000) List[str][source]

Schedule multiple jobs in a single bulk operation for improved performance.

Parameters:
  • jobs – List of JobSpec instances defining the jobs to schedule

  • conflict_resolution – Strategy for ALL jobs (IGNORE recommended for bulk)

  • batch_size – Max jobs per database transaction (default 1000)

Returns:

List of job IDs in same order as input. Jobs that failed to insert will have None in their position.

async listen_for_jobs() None[source]
async execute_job_with_concurrency_control(job_row)[source]

Execute job with semaphore control

async execute_job(job_row)[source]

Execute job with comprehensive error handling and state management

async load_task_functions()[source]

Load task functions with error handling

async monitor_heartbeats()[source]

Monitor heartbeats and lease expiration with enhanced reliability

async send_heartbeat(job_id)[source]

Send heartbeats with lease renewal

async schedule_job(job_name, execution_time, task_data, priority: JobPriority = <JobPriority.NORMAL: normal (db_value=5)>, max_retries: int = 0, job_id: str | None = None, conflict_resolution: ConflictResolution = ConflictResolution.RAISE, misfire_grace_time: int | None = None) str[source]

Schedule a job by inserting it into the ‘scheduled_jobs’ table.

Parameters:
  • job_name (str) – The name of the job to schedule.

  • execution_time (datetime) – The time at which the job should be executed.

  • task_data (dict) – Additional data required for the job execution.

  • priority – Job priority using JobPriority enum (NORMAL or CRITICAL)

  • max_retries – Maximum retry attempts for failed jobs

  • job_id – Optional custom job ID (auto-generated if not provided)

  • conflict_resolution – How to handle duplicate job_id (RAISE, IGNORE, REPLACE)

  • misfire_grace_time – Per-job misfire grace time in seconds. None = use scheduler default

Returns:

The job ID of the scheduled job

Return type:

str

Raises:

ValueError – If the provided job_id already exists and conflict_resolution is RAISE

async cancel_job(job_id: str) bool[source]

Cancel a scheduled job by setting its status to ‘cancelled’.

Parameters:

job_id (str) – The ID of the job to cancel

Returns:

True if the job was successfully cancelled, False otherwise

Return type:

bool

async update_job_status(job_id, status)[source]

Atomically update the status of a job.

async run_vacuum() Dict[str, int][source]

Manually trigger vacuum policies and return statistics.

Returns:

Dict with counts of deleted jobs by status

async get_vacuum_stats(days: int = 7) list[source]

Get vacuum statistics for the last N days (requires track_metrics=True).

Parameters:

days – Number of days to look back

Returns:

List of vacuum statistics records

async get_total_vacuum_stats() dict[source]

Get aggregated vacuum statistics across all time and workers (requires track_metrics=True).

Returns:

Dict with total counts and last vacuum run time

get_periodic_jobs() Dict[str, PeriodicJobConfig][source]

Get all registered periodic jobs

enable_periodic_job(dedup_key: str) bool[source]

Enable a specific periodic job

disable_periodic_job(dedup_key: str) bool[source]

Disable a specific periodic job

async trigger_periodic_job(dedup_key: str) str | None[source]

Manually trigger a periodic job execution

get_periodic_job_status(dedup_key: str) Dict[str, Any] | None[source]

Get status information for a periodic job