Source code for pg_scheduler.periodic

"""
Periodic job functionality for PG Scheduler.

This module provides the @periodic decorator and related infrastructure for
scheduling recurring jobs with cross-replica deduplication and optional
advisory lock support.

Features:
- Interval-based scheduling (e.g., every 5 minutes)
- Cron expression scheduling (e.g., "0 0 * * *" for daily at midnight)
- Timezone support using zoneinfo (Python 3.9+ standard library)
"""

import datetime
import functools
import hashlib
import inspect
import logging
import struct
from dataclasses import dataclass
from datetime import UTC, timedelta
from typing import Callable, Dict, Optional, Union, TYPE_CHECKING
from zoneinfo import ZoneInfo

try:
    from croniter import croniter
    CRONITER_AVAILABLE = True
except ImportError:
    CRONITER_AVAILABLE = False
    croniter = None

from .job_priority import JobPriority

if TYPE_CHECKING:
    from .scheduler import Scheduler
    from .conflict_resolution import ConflictResolution

logger = logging.getLogger(__name__)


[docs] @dataclass class PeriodicJobConfig: """ Configuration for a periodic job. Supports two scheduling modes: 1. Interval-based: Specify `interval` (timedelta) 2. Cron-based: Specify `cron` (cron expression string) Args: func: The async function to execute periodically interval: Time interval between executions (mutually exclusive with cron) cron: Cron expression (e.g., "0 0 * * *" for daily at midnight) (mutually exclusive with interval) timezone: Timezone for cron scheduling (e.g., "America/New_York", "Europe/London") Uses UTC if not specified. Requires cron to be set. use_advisory_lock: Use PostgreSQL advisory locks for exclusive execution priority: Job priority (defaults to NORMAL) max_retries: Maximum retry attempts for failed executions job_name: Custom job name (auto-generated if None) dedup_key: Custom deduplication key (auto-generated if None) enabled: Whether the periodic job is enabled """ func: Callable interval: Optional[timedelta] = None cron: Optional[str] = None timezone: Optional[Union[str, ZoneInfo]] = None use_advisory_lock: bool = False priority: 'JobPriority' = None # Will default to JobPriority.NORMAL max_retries: int = 0 job_name: Optional[str] = None # Auto-generated from function name if None dedup_key: Optional[str] = None # Auto-generated if None enabled: bool = True
[docs] def __post_init__(self): """Set defaults and generate dedup key""" # Validate scheduling mode if self.interval is None and self.cron is None: raise ValueError("Either 'interval' or 'cron' must be specified") if self.interval is not None and self.cron is not None: raise ValueError("Cannot specify both 'interval' and 'cron'") # Validate cron availability if self.cron is not None and not CRONITER_AVAILABLE: raise ImportError( "croniter is required for cron-based scheduling. " "Install it with: pip install croniter>=3.0.0" ) # Validate timezone if self.timezone is not None: if self.cron is None: raise ValueError("'timezone' can only be used with cron-based scheduling") # Convert string timezone to ZoneInfo if isinstance(self.timezone, str): try: self.timezone = ZoneInfo(self.timezone) except Exception as e: raise ValueError(f"Invalid timezone '{self.timezone}': {e}") # Set defaults if self.priority is None: self.priority = JobPriority.NORMAL if self.job_name is None: self.job_name = f"periodic_{self.func.__name__}" if self.dedup_key is None: # Generate deterministic dedup key based on function and schedule func_signature = f"{self.func.__module__}.{self.func.__name__}" if self.interval: schedule_str = f"interval:{self.interval.total_seconds()}" else: tz_str = str(self.timezone) if self.timezone else "UTC" schedule_str = f"cron:{self.cron}:tz:{tz_str}" key_material = f"{func_signature}:{schedule_str}" self.dedup_key = hashlib.sha256(key_material.encode()).hexdigest()[:16]
class PeriodicJobRegistry: """Registry for periodic jobs""" def __init__(self): self._periodic_jobs: Dict[str, PeriodicJobConfig] = {} self._scheduler: Optional['Scheduler'] = None def register(self, config: PeriodicJobConfig): """Register a periodic job""" self._periodic_jobs[config.dedup_key] = config schedule_desc = config.cron if config.cron else f"every {config.interval}" tz_desc = f" ({config.timezone})" if config.timezone else "" logger.debug(f"Registered periodic job: {config.job_name} ({schedule_desc}{tz_desc}, dedup_key={config.dedup_key})") def set_scheduler(self, scheduler: 'Scheduler'): """Set the scheduler instance""" self._scheduler = scheduler def get_jobs(self) -> Dict[str, PeriodicJobConfig]: """Get all registered periodic jobs""" return self._periodic_jobs.copy() def _calculate_next_run(self, config: PeriodicJobConfig, base_time: Optional[datetime.datetime] = None) -> datetime.datetime: """ Calculate the next execution time for a periodic job. Args: config: The periodic job configuration base_time: Base time to calculate from (defaults to now in appropriate timezone) Returns: Next execution time as timezone-aware datetime """ if config.interval: # Interval-based scheduling if base_time is None: base_time = datetime.datetime.now(UTC) return base_time + config.interval elif config.cron: # Cron-based scheduling tz = config.timezone or UTC if base_time is None: base_time = datetime.datetime.now(tz) elif base_time.tzinfo != tz: # Convert base_time to the correct timezone base_time = base_time.astimezone(tz) # Use croniter to calculate next occurrence cron_iter = croniter(config.cron, base_time) next_run = cron_iter.get_next(datetime.datetime) # Ensure the result is timezone-aware if next_run.tzinfo is None: next_run = next_run.replace(tzinfo=tz) # Convert to UTC for internal use return next_run.astimezone(UTC) else: raise ValueError("Invalid periodic job configuration: neither interval nor cron specified") async def start_all_jobs(self): """Start all enabled periodic jobs""" if not self._scheduler: raise RuntimeError("No scheduler set on periodic job registry") for config in self._periodic_jobs.values(): if config.enabled: await self._start_periodic_job(config) async def _start_periodic_job(self, config: PeriodicJobConfig): """Start a single periodic job""" # Import here to avoid circular import from .conflict_resolution import ConflictResolution # Calculate next execution time next_run = self._calculate_next_run(config) # Create dedup job ID for this window window_key = self._get_window_key(next_run, config) job_id = f"periodic:{config.dedup_key}:{window_key}" try: await self._scheduler.schedule( self._create_periodic_wrapper(config), execution_time=next_run, job_id=job_id, conflict_resolution=ConflictResolution.IGNORE, # Dedup across replicas priority=config.priority, max_retries=config.max_retries ) logger.debug(f"Scheduled periodic job {config.job_name} for {next_run}") except Exception as e: logger.error(f"Failed to schedule periodic job {config.job_name}: {e}") def _get_window_key(self, execution_time: datetime.datetime, config: PeriodicJobConfig) -> str: """ Generate a window key for deduplication within time windows. For interval-based jobs: rounds down to interval boundary For cron-based jobs: uses the exact execution timestamp (rounded to minute) """ if config.interval: # Interval-based: use window number epoch = datetime.datetime(1970, 1, 1, tzinfo=UTC) seconds_since_epoch = (execution_time - epoch).total_seconds() interval_seconds = config.interval.total_seconds() window_number = int(seconds_since_epoch // interval_seconds) return str(window_number) else: # Cron-based: use timestamp rounded to minute for deduplication # This ensures same cron schedule time maps to same window key timestamp = int(execution_time.timestamp() // 60) # Round to minute return str(timestamp) def _create_periodic_wrapper(self, config: PeriodicJobConfig): """Create a wrapper function that handles periodic job execution and rescheduling""" @functools.wraps(config.func) async def periodic_wrapper(): lock_acquired = False lock_key = None try: # If advisory lock is enabled, try to acquire lock if config.use_advisory_lock: lock_key = self._get_advisory_lock_key(config) lock_acquired = await self._try_acquire_advisory_lock(lock_key) if not lock_acquired: logger.debug(f"Advisory lock for {config.job_name} already held by another worker, skipping execution") return # Skip execution if lock can't be acquired # Execute the original function if inspect.iscoroutinefunction(config.func): await config.func() else: # Handle sync functions config.func() logger.debug(f"Periodic job {config.job_name} completed successfully") except Exception as e: logger.error(f"Periodic job {config.job_name} failed: {e}") raise # Re-raise to let scheduler handle retries finally: # Release advisory lock if it was acquired if config.use_advisory_lock and lock_acquired and lock_key: await self._release_advisory_lock(lock_key) # Always reschedule for next execution (self-rescheduling) if config.enabled: await self._reschedule_periodic_job(config) # Set function name for scheduler registration periodic_wrapper.__name__ = f"periodic_{config.func.__name__}" return periodic_wrapper def _get_advisory_lock_key(self, config: PeriodicJobConfig) -> int: """Generate a numeric lock key for PostgreSQL advisory locks""" # PostgreSQL advisory locks use bigint (int8), so we need a numeric key # Hash the dedup_key to get a consistent numeric value hash_bytes = hashlib.sha256(config.dedup_key.encode()).digest()[:8] return struct.unpack('>q', hash_bytes)[0] # Convert to signed 64-bit int async def _try_acquire_advisory_lock(self, lock_key: int) -> bool: """Try to acquire a PostgreSQL advisory lock (non-blocking)""" try: result = await self._scheduler.db_pool.fetchval( "SELECT pg_try_advisory_lock($1);", lock_key ) return bool(result) except Exception as e: logger.error(f"Failed to acquire advisory lock {lock_key}: {e}") return False async def _release_advisory_lock(self, lock_key: int): """Release a PostgreSQL advisory lock""" try: await self._scheduler.db_pool.execute( "SELECT pg_advisory_unlock($1);", lock_key ) except Exception as e: logger.error(f"Failed to release advisory lock {lock_key}: {e}") async def _reschedule_periodic_job(self, config: PeriodicJobConfig): """Reschedule the periodic job for the next execution""" # Import here to avoid circular import from .conflict_resolution import ConflictResolution try: next_run = self._calculate_next_run(config) window_key = self._get_window_key(next_run, config) job_id = f"periodic:{config.dedup_key}:{window_key}" await self._scheduler.schedule( self._create_periodic_wrapper(config), execution_time=next_run, job_id=job_id, conflict_resolution=ConflictResolution.IGNORE, priority=config.priority, max_retries=config.max_retries ) logger.debug(f"Rescheduled periodic job {config.job_name} for {next_run}") except Exception as e: logger.error(f"Failed to reschedule periodic job {config.job_name}: {e}") # Global registry instance _periodic_registry = PeriodicJobRegistry()
[docs] def periodic(every: Optional[timedelta] = None, cron: Optional[str] = None, timezone: Optional[Union[str, ZoneInfo]] = None, use_advisory_lock: bool = False, priority: JobPriority = JobPriority.NORMAL, max_retries: int = 0, job_name: Optional[str] = None, dedup_key: Optional[str] = None, enabled: bool = True) -> Callable: """ Decorator to mark an async function as a periodic job. Supports two scheduling modes: 1. **Interval-based**: Use `every` parameter (e.g., `every=timedelta(minutes=15)`) 2. **Cron-based**: Use `cron` parameter (e.g., `cron="0 0 * * *"` for daily at midnight) Features: - Guarantees exactly one enqueue per window across many replicas (via dedup key) - Self-reschedules at the end of each run - Optional advisory-lock protection for exclusive execution - Timezone support for cron expressions Args: every: Time interval between executions (timedelta). Mutually exclusive with `cron`. cron: Cron expression string (e.g., "0 0 * * *"). Mutually exclusive with `every`. Format: minute hour day month day_of_week Examples: - "0 0 * * *" = daily at midnight - "0 0 * * SUN" = every Sunday at midnight - "*/15 * * * *" = every 15 minutes - "0 9-17 * * MON-FRI" = every hour 9am-5pm on weekdays timezone: Timezone for cron scheduling (e.g., "America/New_York", "Europe/London"). Can be a string or ZoneInfo object. Only valid with `cron`. Defaults to UTC if not specified. use_advisory_lock: Use PostgreSQL advisory locks for exclusive execution across replicas priority: Job priority (CRITICAL, HIGH, NORMAL, LOW) max_retries: Maximum retry attempts for failed executions job_name: Custom job name (auto-generated from function name if None) dedup_key: Custom deduplication key (auto-generated if None) enabled: Whether the periodic job is enabled Examples: # Interval-based scheduling @periodic(every=timedelta(minutes=15)) async def cleanup_temp_files(): print("Cleaning up temp files...") # Cron-based scheduling (daily at midnight UTC) @periodic(cron="0 0 * * *") async def daily_backup(): print("Running daily backup...") # Cron with timezone (every Sunday at 3am EST) @periodic(cron="0 3 * * SUN", timezone="America/New_York") async def weekly_report(): print("Generating weekly report...") # Cron with priority and retries @periodic(cron="0 9 * * MON-FRI", timezone="Europe/London", priority=JobPriority.HIGH, max_retries=3) async def business_hours_task(): print("Running business hours task...") Raises: ValueError: If neither `every` nor `cron` is specified, or both are specified ValueError: If `timezone` is specified without `cron` ImportError: If `cron` is specified but croniter is not installed TypeError: If decorated function is not async """ def decorator(func: Callable) -> Callable: # Validate function is async if not inspect.iscoroutinefunction(func): raise TypeError(f"@periodic can only be applied to async functions, got {type(func)}") # Create periodic job configuration config = PeriodicJobConfig( func=func, interval=every, cron=cron, timezone=timezone, use_advisory_lock=use_advisory_lock, priority=priority, max_retries=max_retries, job_name=job_name, dedup_key=dedup_key, enabled=enabled ) # Register with global registry _periodic_registry.register(config) # Return the original function (it's still callable directly) return func return decorator