Task Scheduling & Cron Jobs in Python — Automate Anything on a Timer

By Kristy · March 2026 · 18 min read

Table of Contents

Why Task Scheduling Matters

Every production system has work that needs to happen on a schedule. Scrape prices every hour. Clean up temp files at midnight. Send weekly reports on Monday morning. Generate backups every 6 hours. Without proper scheduling, you end up with a mess of shell scripts, manual cron entries, and no visibility into what's running.

Python gives you multiple ways to schedule tasks, from dead-simple one-liners to enterprise-grade distributed queues. The right choice depends on your needs:

Let's go through each approach with real, working code.

Simple Scheduling with the schedule Library

The schedule library is the fastest way to add periodic tasks to a Python script. Install it:

pip install schedule

Basic Usage

import schedule
import time


def check_prices():
    print("Checking prices...")
    # Your scraping logic here


def send_daily_report():
    print("Sending daily report...")
    # Your email/notification logic here


def cleanup_temp_files():
    print("Cleaning up...")
    # Your cleanup logic here


# Schedule jobs
schedule.every(30).minutes.do(check_prices)
schedule.every().day.at("09:00").do(send_daily_report)
schedule.every().monday.at("08:00").do(cleanup_temp_files)
schedule.every(6).hours.do(lambda: print("Health check OK"))

# Run the scheduler
while True:
    schedule.run_pending()
    time.sleep(1)

The API is beautifully readable. schedule.every(30).minutes.do(fn) does exactly what it says.

Passing Arguments to Jobs

def scrape_site(url: str, max_pages: int = 10):
    print(f"Scraping {url} (up to {max_pages} pages)")


# Pass arguments directly
schedule.every(2).hours.do(scrape_site, "https://example.com", max_pages=50)
schedule.every(4).hours.do(scrape_site, "https://another.com")

Running Jobs Once

import schedule


def one_time_setup():
    print("Running initial setup...")
    return schedule.CancelJob  # Removes itself after running


schedule.every(1).seconds.do(one_time_setup)
When to use schedule: Quick scripts, prototypes, single-process applications. It runs in the main thread (or a dedicated thread), stores nothing to disk, and doesn't survive process restarts. Perfect for simple automation that runs under a process manager like systemd or supervisord.

Thread-Safe Scheduling

import schedule
import threading
import time


def run_scheduler():
    """Run scheduler in a background thread."""
    while True:
        schedule.run_pending()
        time.sleep(1)


# Set up jobs
schedule.every(10).minutes.do(lambda: print("Background task"))

# Start scheduler thread
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
scheduler_thread.start()

# Main thread continues doing other work
print("Main thread is free to do other things")
while True:
    time.sleep(60)
    print("Main thread heartbeat")

APScheduler — Advanced Job Scheduling

When you need persistence, multiple trigger types, timezone support, or job stores, APScheduler is the go-to. It's a full-featured scheduling framework.

pip install apscheduler

Three Trigger Types

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from datetime import datetime, timedelta


scheduler = BlockingScheduler()


def fetch_data():
    print(f"[{datetime.now():%H:%M:%S}] Fetching data...")


def generate_report():
    print(f"[{datetime.now():%H:%M:%S}] Generating report...")


def one_time_task():
    print("This runs exactly once!")


# Interval trigger — every 5 minutes
scheduler.add_job(fetch_data, IntervalTrigger(minutes=5), id="fetch_data")

# Cron trigger — weekdays at 9:00 AM
scheduler.add_job(
    generate_report,
    CronTrigger(day_of_week="mon-fri", hour=9, minute=0),
    id="daily_report",
)

# Date trigger — one-shot at specific time
scheduler.add_job(
    one_time_task,
    DateTrigger(run_date=datetime.now() + timedelta(hours=1)),
    id="one_shot",
)

print("Scheduler starting...")
scheduler.start()

Cron Expressions

# Full cron expression support
from apscheduler.triggers.cron import CronTrigger

# Every 15 minutes
CronTrigger.from_crontab("*/15 * * * *")

# Weekdays at 8:30 AM
CronTrigger.from_crontab("30 8 * * 1-5")

# First day of every month at midnight
CronTrigger.from_crontab("0 0 1 * *")

# Every Sunday at 6 PM
CronTrigger.from_crontab("0 18 * * 0")

# With timezone
CronTrigger.from_crontab("0 9 * * *", timezone="America/New_York")

Persistent Job Store with SQLite

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.triggers.interval import IntervalTrigger


# Jobs survive restarts — stored in SQLite
jobstores = {
    "default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite")
}

scheduler = BlockingScheduler(jobstores=jobstores)


def my_job():
    print("Running persistent job!")


# replace_existing=True prevents duplicates on restart
scheduler.add_job(
    my_job,
    IntervalTrigger(minutes=10),
    id="persistent_job",
    replace_existing=True,
)

scheduler.start()
Important: With persistent job stores, always use replace_existing=True and set an explicit id. Otherwise, every restart creates a duplicate job.

Background Scheduler (Non-Blocking)

from apscheduler.schedulers.background import BackgroundScheduler
import time


scheduler = BackgroundScheduler()
scheduler.add_job(lambda: print("tick"), "interval", seconds=30)
scheduler.start()

# Your application continues running
try:
    while True:
        time.sleep(1)
        # Main application logic here
except KeyboardInterrupt:
    scheduler.shutdown(wait=True)  # Graceful shutdown

Async Scheduler

import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler


async def async_task():
    print("Running async task...")
    await asyncio.sleep(1)
    print("Async task done!")


scheduler = AsyncIOScheduler()
scheduler.add_job(async_task, "interval", minutes=5)
scheduler.start()

# Run with asyncio event loop
asyncio.get_event_loop().run_forever()

System Cron from Python

Sometimes you want Python to manage system crontab entries — adding, removing, or listing cron jobs programmatically. The python-crontab library wraps the system crontab.

pip install python-crontab

Managing Crontab Entries

from crontab import CronTab


# Access current user's crontab
cron = CronTab(user=True)

# Create a new cron job
job = cron.new(command="/usr/bin/python3 /home/user/scripts/backup.py")
job.setall("0 2 * * *")  # Daily at 2:00 AM
job.set_comment("Daily backup script")

# Enable/disable
job.enable(False)  # Disable
job.enable(True)   # Re-enable

# Write changes to crontab
cron.write()

# List all cron jobs
for job in cron:
    print(f"Schedule: {job.slices} | Command: {job.command} | Enabled: {job.is_enabled()}")

# Find jobs by comment
backups = cron.find_comment("Daily backup")
for job in backups:
    print(f"Found: {job}")

# Remove a job
cron.remove(job)
cron.write()

Validating Cron Expressions

from crontab import CronTab


cron = CronTab(user=True)
job = cron.new(command="echo test")

# These all work
job.setall("*/5 * * * *")       # Every 5 minutes
job.setall("0 9 * * 1-5")       # Weekdays 9 AM
job.setall("0 0 1,15 * *")      # 1st and 15th of month

# Check next run time
from datetime import datetime
schedule = job.schedule(date_from=datetime.now())
print(f"Next run: {schedule.get_next()}")
print(f"Previous run: {schedule.get_prev()}")

# Get next 5 run times
for i, dt in enumerate(schedule):
    if i >= 5:
        break
    print(f"  Run {i+1}: {dt}")
Pro tip: Use python-crontab for deployment scripts that need to set up scheduled jobs on servers. It's cleaner than echoing cron lines into crontab files with shell commands.

Celery Beat for Distributed Tasks

For serious production workloads — tasks distributed across multiple workers, automatic retries, result tracking — Celery Beat is the industry standard.

pip install celery[redis]

Project Setup

# celery_app.py
from celery import Celery
from celery.schedules import crontab

app = Celery("myapp", broker="redis://localhost:6379/0")

# Configure periodic tasks
app.conf.beat_schedule = {
    "scrape-every-hour": {
        "task": "tasks.scrape_prices",
        "schedule": 3600.0,  # seconds
    },
    "daily-report": {
        "task": "tasks.generate_report",
        "schedule": crontab(hour=9, minute=0),
    },
    "weekly-cleanup": {
        "task": "tasks.cleanup_old_data",
        "schedule": crontab(hour=0, minute=0, day_of_week="sunday"),
    },
    "every-5-minutes": {
        "task": "tasks.health_check",
        "schedule": 300.0,
        "args": ("production",),  # Pass arguments
    },
}

app.conf.timezone = "America/New_York"

Task Definitions

# tasks.py
from celery_app import app
import logging

logger = logging.getLogger(__name__)


@app.task(bind=True, max_retries=3, default_retry_delay=60)
def scrape_prices(self):
    """Scrape prices with automatic retry on failure."""
    try:
        logger.info("Scraping prices...")
        # Your scraping logic
        prices = {"BTC": 65000, "ETH": 3400}
        return prices
    except Exception as exc:
        logger.error(f"Scrape failed: {exc}")
        raise self.retry(exc=exc)


@app.task
def generate_report():
    """Generate and email daily report."""
    logger.info("Generating report...")
    # Build report, send email
    return "Report sent"


@app.task
def cleanup_old_data():
    """Remove data older than 90 days."""
    logger.info("Cleaning up...")
    # Delete old records
    return "Cleanup complete"


@app.task
def health_check(environment: str):
    """Check system health."""
    logger.info(f"Health check for {environment}")
    # Check DB, Redis, API endpoints
    return {"status": "healthy", "env": environment}

Running Celery

# Terminal 1: Start the worker
celery -A celery_app worker --loglevel=info

# Terminal 2: Start the beat scheduler
celery -A celery_app beat --loglevel=info

# Or combine both (for development)
celery -A celery_app worker --beat --loglevel=info

Dynamic Schedule Management

# Add a task at runtime
from celery_app import app
from celery.schedules import crontab


# Using database-backed scheduler (django-celery-beat or similar)
# allows adding/removing tasks without restart

# Manual one-off scheduled task
from tasks import scrape_prices
result = scrape_prices.apply_async(countdown=300)  # Run in 5 minutes
print(f"Task ID: {result.id}")

# Check result
print(f"Status: {result.status}")
print(f"Result: {result.get(timeout=60)}")

Building a Scheduler from Scratch

Sometimes you don't want external dependencies. Here's a minimal but functional task scheduler using only the standard library:

import threading
import time
import logging
from datetime import datetime, timedelta
from typing import Callable
from dataclasses import dataclass, field

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)


@dataclass
class Job:
    name: str
    func: Callable
    interval_seconds: float
    args: tuple = ()
    kwargs: dict = field(default_factory=dict)
    next_run: datetime = field(default_factory=datetime.now)
    last_run: datetime | None = None
    run_count: int = 0
    error_count: int = 0
    enabled: bool = True


class SimpleScheduler:
    """Lightweight task scheduler — no external dependencies."""

    def __init__(self):
        self.jobs: dict[str, Job] = {}
        self._running = False
        self._lock = threading.Lock()
        self._thread: threading.Thread | None = None

    def every(self, seconds: float, name: str | None = None):
        """Decorator to register a periodic job."""
        def decorator(func: Callable):
            job_name = name or func.__name__
            self.add_job(job_name, func, seconds)
            return func
        return decorator

    def add_job(self, name: str, func: Callable, interval_seconds: float,
                args: tuple = (), kwargs: dict | None = None):
        """Register a job with the scheduler."""
        with self._lock:
            self.jobs[name] = Job(
                name=name,
                func=func,
                interval_seconds=interval_seconds,
                args=args,
                kwargs=kwargs or {},
            )
        logger.info(f"Job '{name}' registered (every {interval_seconds}s)")

    def remove_job(self, name: str):
        """Remove a job by name."""
        with self._lock:
            if name in self.jobs:
                del self.jobs[name]
                logger.info(f"Job '{name}' removed")

    def _run_job(self, job: Job):
        """Execute a single job with error handling."""
        try:
            logger.info(f"Running job '{job.name}'...")
            job.func(*job.args, **job.kwargs)
            job.run_count += 1
            job.last_run = datetime.now()
            logger.info(f"Job '{job.name}' completed (run #{job.run_count})")
        except Exception as e:
            job.error_count += 1
            logger.error(f"Job '{job.name}' failed: {e} (errors: {job.error_count})")

    def _loop(self):
        """Main scheduler loop."""
        while self._running:
            now = datetime.now()
            with self._lock:
                due_jobs = [
                    j for j in self.jobs.values()
                    if j.enabled and j.next_run <= now
                ]

            for job in due_jobs:
                self._run_job(job)
                job.next_run = datetime.now() + timedelta(seconds=job.interval_seconds)

            time.sleep(0.5)  # Check every 500ms

    def start(self, background: bool = False):
        """Start the scheduler."""
        self._running = True
        logger.info(f"Scheduler started with {len(self.jobs)} jobs")

        if background:
            self._thread = threading.Thread(target=self._loop, daemon=True)
            self._thread.start()
        else:
            try:
                self._loop()
            except KeyboardInterrupt:
                self.stop()

    def stop(self):
        """Stop the scheduler gracefully."""
        self._running = False
        if self._thread:
            self._thread.join(timeout=5)
        logger.info("Scheduler stopped")

    def status(self) -> list[dict]:
        """Get status of all jobs."""
        with self._lock:
            return [
                {
                    "name": j.name,
                    "enabled": j.enabled,
                    "interval": j.interval_seconds,
                    "runs": j.run_count,
                    "errors": j.error_count,
                    "last_run": j.last_run.isoformat() if j.last_run else None,
                    "next_run": j.next_run.isoformat(),
                }
                for j in self.jobs.values()
            ]


# Usage
scheduler = SimpleScheduler()


@scheduler.every(60, name="heartbeat")
def heartbeat():
    print(f"❤️ alive at {datetime.now():%H:%M:%S}")


@scheduler.every(300, name="check_disk")
def check_disk():
    import shutil
    usage = shutil.disk_usage("/")
    pct = usage.used / usage.total * 100
    print(f"Disk usage: {pct:.1f}%")
    if pct > 90:
        print("⚠️ Disk space critical!")


@scheduler.every(3600, name="cleanup")
def cleanup():
    from pathlib import Path
    tmp = Path("/tmp")
    old_files = [f for f in tmp.glob("myapp_*") if f.stat().st_mtime < time.time() - 86400]
    for f in old_files:
        f.unlink()
    print(f"Cleaned {len(old_files)} temp files")


if __name__ == "__main__":
    scheduler.start()  # Blocking — Ctrl+C to stop

Real-World Examples

Periodic Web Scraping

import schedule
import json
import logging
from datetime import datetime
from pathlib import Path

logger = logging.getLogger(__name__)


def scrape_and_store():
    """Scrape data and append to daily JSON file."""
    import urllib.request

    url = "https://api.example.com/prices"
    try:
        with urllib.request.urlopen(url, timeout=30) as resp:
            data = json.loads(resp.read())
    except Exception as e:
        logger.error(f"Scrape failed: {e}")
        return

    # Store with timestamp
    entry = {
        "timestamp": datetime.now().isoformat(),
        "data": data,
    }

    output = Path(f"data/prices_{datetime.now():%Y%m%d}.jsonl")
    output.parent.mkdir(exist_ok=True)

    with open(output, "a") as f:
        f.write(json.dumps(entry) + "\n")

    logger.info(f"Stored {len(data)} prices → {output}")


# Run every hour, plus once at startup
scrape_and_store()
schedule.every(1).hour.do(scrape_and_store)

Database Cleanup

import sqlite3
from datetime import datetime, timedelta


def cleanup_database(db_path: str, retention_days: int = 90):
    """Remove records older than retention period."""
    cutoff = datetime.now() - timedelta(days=retention_days)

    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    tables = {
        "logs": "created_at",
        "sessions": "last_active",
        "temp_data": "timestamp",
    }

    total_deleted = 0
    for table, date_col in tables.items():
        cursor.execute(
            f"DELETE FROM {table} WHERE {date_col} < ?",
            (cutoff.isoformat(),)
        )
        deleted = cursor.rowcount
        total_deleted += deleted
        if deleted > 0:
            print(f"  {table}: deleted {deleted} rows")

    # Reclaim space
    cursor.execute("VACUUM")
    conn.commit()
    conn.close()

    print(f"Cleanup complete: {total_deleted} total rows deleted")
    return total_deleted

Health Check Monitor

import urllib.request
import json
import time
import logging

logger = logging.getLogger(__name__)


def check_endpoints():
    """Check health of multiple services and alert on failures."""
    endpoints = {
        "API": "https://api.myapp.com/health",
        "Docs": "https://docs.myapp.com",
        "Dashboard": "https://dash.myapp.com/ping",
    }

    results = {}
    for name, url in endpoints.items():
        start = time.time()
        try:
            req = urllib.request.Request(url, method="GET")
            with urllib.request.urlopen(req, timeout=10) as resp:
                status = resp.status
                latency = (time.time() - start) * 1000
                results[name] = {
                    "status": "up",
                    "code": status,
                    "latency_ms": round(latency),
                }
        except Exception as e:
            results[name] = {
                "status": "down",
                "error": str(e),
            }
            logger.error(f"🔴 {name} is DOWN: {e}")

    # Summary
    up = sum(1 for r in results.values() if r["status"] == "up")
    total = len(results)
    logger.info(f"Health check: {up}/{total} services up")

    if up < total:
        send_alert(results)

    return results


def send_alert(results: dict):
    """Send alert for failed services."""
    failed = {k: v for k, v in results.items() if v["status"] == "down"}
    msg = "⚠️ Service Alert:\n"
    for name, info in failed.items():
        msg += f"  🔴 {name}: {info.get('error', 'unknown')}\n"
    # Send via email, Slack, Telegram, etc.
    print(msg)

Error Handling & Retry Patterns

Scheduled tasks fail. Networks go down, APIs return errors, disks fill up. Your scheduler needs to handle this gracefully.

Retry with Exponential Backoff

import time
import logging
from functools import wraps

logger = logging.getLogger(__name__)


def with_retry(max_retries: int = 3, base_delay: float = 1.0,
               max_delay: float = 60.0, exceptions: tuple = (Exception,)):
    """Decorator that adds retry logic with exponential backoff."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_retries:
                        logger.error(
                            f"{func.__name__} failed after {max_retries + 1} "
                            f"attempts: {e}"
                        )
                        raise
                    delay = min(base_delay * (2 ** attempt), max_delay)
                    logger.warning(
                        f"{func.__name__} attempt {attempt + 1} failed: {e}. "
                        f"Retrying in {delay:.1f}s"
                    )
                    time.sleep(delay)
        return wrapper
    return decorator


@with_retry(max_retries=3, base_delay=2.0)
def fetch_api_data(url: str) -> dict:
    import urllib.request
    with urllib.request.urlopen(url, timeout=30) as resp:
        return json.loads(resp.read())


# Usage in a scheduled job
def scheduled_fetch():
    try:
        data = fetch_api_data("https://api.example.com/data")
        process_data(data)
    except Exception:
        # All retries exhausted — log and move on
        # The next scheduled run will try again
        logger.error("Fetch completely failed, will retry next cycle")

Dead Letter Queue Pattern

import json
from pathlib import Path
from datetime import datetime


class DeadLetterQueue:
    """Store failed job executions for later inspection and replay."""

    def __init__(self, path: str = "dead_letters.jsonl"):
        self.path = Path(path)

    def add(self, job_name: str, error: str, context: dict = None):
        """Record a failed job execution."""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "job": job_name,
            "error": error,
            "context": context or {},
        }
        with open(self.path, "a") as f:
            f.write(json.dumps(entry) + "\n")

    def get_failures(self, job_name: str = None, limit: int = 50) -> list:
        """Retrieve recent failures, optionally filtered by job name."""
        if not self.path.exists():
            return []

        failures = []
        for line in self.path.read_text().strip().split("\n"):
            if not line:
                continue
            entry = json.loads(line)
            if job_name and entry["job"] != job_name:
                continue
            failures.append(entry)

        return failures[-limit:]

    def replay(self, job_registry: dict):
        """Attempt to replay all failed jobs."""
        failures = self.get_failures()
        replayed = 0

        for entry in failures:
            job_func = job_registry.get(entry["job"])
            if job_func:
                try:
                    job_func(**entry.get("context", {}))
                    replayed += 1
                except Exception as e:
                    print(f"Replay failed for {entry['job']}: {e}")

        print(f"Replayed {replayed}/{len(failures)} failed jobs")


# Usage
dlq = DeadLetterQueue()

def risky_job():
    try:
        # ... do work
        pass
    except Exception as e:
        dlq.add("risky_job", str(e), {"attempt_time": datetime.now().isoformat()})
        raise

Monitoring Scheduled Jobs

Structured Logging

import logging
import json
from datetime import datetime


class JobLogger:
    """Structured logging for scheduled jobs."""

    def __init__(self, log_file: str = "jobs.log"):
        self.logger = logging.getLogger("scheduler")
        handler = logging.FileHandler(log_file)
        handler.setFormatter(logging.Formatter("%(message)s"))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def log_start(self, job_name: str):
        self._log("START", job_name)

    def log_success(self, job_name: str, duration_ms: float, result: str = ""):
        self._log("SUCCESS", job_name, duration_ms=duration_ms, result=result)

    def log_failure(self, job_name: str, error: str, duration_ms: float = 0):
        self._log("FAILURE", job_name, duration_ms=duration_ms, error=error)

    def _log(self, event: str, job_name: str, **extra):
        entry = {
            "ts": datetime.now().isoformat(),
            "event": event,
            "job": job_name,
            **extra,
        }
        self.logger.info(json.dumps(entry))


# Usage
job_log = JobLogger()

def monitored_job(name: str, func, *args, **kwargs):
    """Wrap any function with monitoring."""
    import time
    job_log.log_start(name)
    start = time.time()
    try:
        result = func(*args, **kwargs)
        duration = (time.time() - start) * 1000
        job_log.log_success(name, duration, str(result)[:200])
        return result
    except Exception as e:
        duration = (time.time() - start) * 1000
        job_log.log_failure(name, str(e), duration)
        raise

Heartbeat Monitoring

import time
import urllib.request
from datetime import datetime


class HeartbeatMonitor:
    """Send heartbeats to monitoring services (Dead Man's Snitch, Healthchecks.io)."""

    def __init__(self, ping_url: str):
        self.ping_url = ping_url

    def ping(self, status: str = "ok"):
        """Send heartbeat ping."""
        url = f"{self.ping_url}/{status}" if status != "ok" else self.ping_url
        try:
            urllib.request.urlopen(url, timeout=10)
        except Exception:
            pass  # Monitoring failure shouldn't crash the job

    def wrap(self, func):
        """Decorator to add heartbeat to a job."""
        def wrapper(*args, **kwargs):
            try:
                result = func(*args, **kwargs)
                self.ping("ok")
                return result
            except Exception as e:
                self.ping("fail")
                raise
        return wrapper


# Usage with Healthchecks.io
monitor = HeartbeatMonitor("https://hc-ping.com/your-uuid-here")

@monitor.wrap
def critical_backup():
    """This job pings healthchecks.io on success/failure."""
    # ... backup logic
    pass

Best Practices

1. Make Jobs Idempotent

Jobs will run twice sometimes — on restart, clock skew, or manual trigger. Design them so running twice produces the same result as running once.

import hashlib
import json
from pathlib import Path


def idempotent_process(data: dict, output_dir: str = "processed"):
    """Process data idempotently — skip if already processed."""
    # Create a unique key from the input data
    data_hash = hashlib.sha256(json.dumps(data, sort_keys=True).encode()).hexdigest()[:16]
    marker = Path(output_dir) / f".done_{data_hash}"

    if marker.exists():
        print(f"Already processed: {data_hash}")
        return

    # Do the actual work
    result = transform(data)
    save(result, output_dir)

    # Mark as done
    marker.touch()
    print(f"Processed: {data_hash}")

2. Handle Timezone Correctly

from datetime import datetime
from zoneinfo import ZoneInfo  # Python 3.9+


def get_schedule_time(hour: int, minute: int, tz_name: str) -> datetime:
    """Get next occurrence of a specific time in a timezone."""
    tz = ZoneInfo(tz_name)
    now = datetime.now(tz)
    target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)

    if target <= now:
        target += timedelta(days=1)

    return target


# Always store and compare in UTC internally
def log_job_time(job_name: str):
    utc_now = datetime.now(ZoneInfo("UTC"))
    local_now = datetime.now(ZoneInfo("America/New_York"))
    print(f"Job '{job_name}' ran at {utc_now.isoformat()} UTC ({local_now.strftime('%H:%M %Z')} local)")

3. Graceful Shutdown

import signal
import sys


class GracefulScheduler:
    """Scheduler that handles SIGTERM/SIGINT gracefully."""

    def __init__(self):
        self._shutdown = False
        signal.signal(signal.SIGTERM, self._handle_signal)
        signal.signal(signal.SIGINT, self._handle_signal)

    def _handle_signal(self, signum, frame):
        print(f"\nReceived signal {signum}, shutting down gracefully...")
        self._shutdown = True

    def run(self, jobs: list):
        """Run scheduler until shutdown signal."""
        print(f"Scheduler running with {len(jobs)} jobs (PID: {os.getpid()})")

        while not self._shutdown:
            for job in jobs:
                if self._shutdown:
                    break
                if job.is_due():
                    job.run()
            time.sleep(1)

        # Cleanup
        print("Waiting for running jobs to finish...")
        # ... wait for threads/processes
        print("Scheduler stopped cleanly")
        sys.exit(0)

4. Lock Jobs to Prevent Overlapping

import threading
import fcntl
from pathlib import Path


def with_file_lock(lock_name: str):
    """Prevent a job from running concurrently (even across processes)."""
    def decorator(func):
        def wrapper(*args, **kwargs):
            lock_path = Path(f"/tmp/{lock_name}.lock")
            lock_file = open(lock_path, "w")

            try:
                fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
            except BlockingIOError:
                print(f"Job '{lock_name}' already running, skipping")
                lock_file.close()
                return None

            try:
                return func(*args, **kwargs)
            finally:
                fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
                lock_file.close()

        return wrapper
    return decorator


@with_file_lock("daily_scrape")
def daily_scrape():
    """Only one instance of this can run at a time."""
    # Long-running scrape that might overlap with next schedule
    pass

Complete Example: Multi-Job Scheduler

Here's a production-ready scheduler that combines everything — persistence, monitoring, retry, and graceful shutdown:

#!/usr/bin/env python3
"""
Production Task Scheduler
- Multiple job types (interval, cron-like)
- Persistent state (survives restarts)
- Structured logging
- Retry with backoff
- Graceful shutdown
"""
import json
import time
import signal
import logging
import threading
from datetime import datetime, timedelta
from pathlib import Path
from dataclasses import dataclass, field, asdict
from typing import Callable

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("scheduler.log"),
    ],
)
logger = logging.getLogger("scheduler")


@dataclass
class JobConfig:
    name: str
    func: Callable
    interval_seconds: float
    max_retries: int = 2
    retry_delay: float = 5.0
    enabled: bool = True
    overlap: bool = False  # Allow concurrent runs


@dataclass
class JobState:
    next_run: str = ""
    last_run: str = ""
    last_status: str = "pending"
    run_count: int = 0
    error_count: int = 0
    consecutive_errors: int = 0


class ProductionScheduler:
    def __init__(self, state_file: str = "scheduler_state.json"):
        self.jobs: dict[str, JobConfig] = {}
        self.state: dict[str, JobState] = {}
        self.state_file = Path(state_file)
        self._running = False
        self._active_jobs: set[str] = set()
        self._lock = threading.Lock()

        self._load_state()

        signal.signal(signal.SIGTERM, self._shutdown)
        signal.signal(signal.SIGINT, self._shutdown)

    def register(self, name: str, func: Callable, interval_seconds: float,
                 max_retries: int = 2, enabled: bool = True):
        """Register a new job."""
        self.jobs[name] = JobConfig(
            name=name, func=func, interval_seconds=interval_seconds,
            max_retries=max_retries, enabled=enabled,
        )
        if name not in self.state:
            self.state[name] = JobState(
                next_run=datetime.now().isoformat()
            )
        logger.info(f"Registered job '{name}' (every {interval_seconds}s)")

    def _load_state(self):
        if self.state_file.exists():
            raw = json.loads(self.state_file.read_text())
            for name, data in raw.items():
                self.state[name] = JobState(**data)
            logger.info(f"Loaded state for {len(self.state)} jobs")

    def _save_state(self):
        raw = {name: asdict(state) for name, state in self.state.items()}
        self.state_file.write_text(json.dumps(raw, indent=2))

    def _run_job(self, config: JobConfig, state: JobState):
        """Execute a job with retry logic."""
        if not config.overlap and config.name in self._active_jobs:
            logger.warning(f"Job '{config.name}' still running, skipping")
            return

        with self._lock:
            self._active_jobs.add(config.name)

        start = time.time()
        success = False

        for attempt in range(config.max_retries + 1):
            try:
                config.func()
                success = True
                break
            except Exception as e:
                if attempt < config.max_retries:
                    delay = config.retry_delay * (2 ** attempt)
                    logger.warning(
                        f"Job '{config.name}' attempt {attempt + 1} failed: {e}. "
                        f"Retrying in {delay:.0f}s"
                    )
                    time.sleep(delay)
                else:
                    logger.error(
                        f"Job '{config.name}' failed after {attempt + 1} attempts: {e}"
                    )

        duration = (time.time() - start) * 1000
        now = datetime.now()

        state.last_run = now.isoformat()
        state.run_count += 1
        state.next_run = (now + timedelta(seconds=config.interval_seconds)).isoformat()

        if success:
            state.last_status = "success"
            state.consecutive_errors = 0
            logger.info(f"✅ '{config.name}' completed in {duration:.0f}ms")
        else:
            state.last_status = "error"
            state.error_count += 1
            state.consecutive_errors += 1
            logger.error(
                f"❌ '{config.name}' failed ({state.consecutive_errors} consecutive)"
            )

        with self._lock:
            self._active_jobs.discard(config.name)

        self._save_state()

    def _shutdown(self, signum, frame):
        logger.info(f"Shutdown signal received ({signum})")
        self._running = False

    def start(self):
        """Run the scheduler."""
        self._running = True
        logger.info(f"Scheduler started with {len(self.jobs)} jobs")

        while self._running:
            now = datetime.now()

            for name, config in self.jobs.items():
                if not config.enabled:
                    continue

                state = self.state.get(name)
                if not state:
                    continue

                next_run = datetime.fromisoformat(state.next_run)
                if now >= next_run:
                    thread = threading.Thread(
                        target=self._run_job, args=(config, state),
                        name=f"job-{name}", daemon=True,
                    )
                    thread.start()

            time.sleep(1)

        # Wait for active jobs
        logger.info("Waiting for active jobs to finish...")
        for _ in range(30):
            if not self._active_jobs:
                break
            time.sleep(1)

        self._save_state()
        logger.info("Scheduler stopped")

    def status(self):
        """Print status of all jobs."""
        print(f"\n{'Job':<25} {'Status':<10} {'Runs':<8} {'Errors':<8} {'Next Run'}")
        print("-" * 80)
        for name, config in self.jobs.items():
            state = self.state.get(name, JobState())
            enabled = "✓" if config.enabled else "✗"
            print(
                f"{enabled} {name:<23} {state.last_status:<10} "
                f"{state.run_count:<8} {state.error_count:<8} "
                f"{state.next_run[:19]}"
            )
        print()


# === Define your jobs ===

import shutil
import os


def check_disk_space():
    usage = shutil.disk_usage("/")
    pct = usage.used / usage.total * 100
    logger.info(f"Disk: {pct:.1f}% used ({usage.free // (1024**3)}GB free)")
    if pct > 90:
        raise RuntimeError(f"Disk space critical: {pct:.1f}%")


def cleanup_logs():
    log_dir = Path("logs")
    if not log_dir.exists():
        return
    cutoff = time.time() - (7 * 86400)  # 7 days
    removed = 0
    for f in log_dir.glob("*.log"):
        if f.stat().st_mtime < cutoff:
            f.unlink()
            removed += 1
    logger.info(f"Removed {removed} old log files")


def check_services():
    import urllib.request
    services = {
        "Google DNS": "https://dns.google/resolve?name=example.com",
    }
    for name, url in services.items():
        try:
            urllib.request.urlopen(url, timeout=5)
            logger.info(f"  ✓ {name}")
        except Exception as e:
            logger.error(f"  ✗ {name}: {e}")
            raise


if __name__ == "__main__":
    sched = ProductionScheduler()

    sched.register("disk_check", check_disk_space, interval_seconds=300)
    sched.register("log_cleanup", cleanup_logs, interval_seconds=86400)
    sched.register("service_health", check_services, interval_seconds=600)

    sched.status()
    sched.start()

Want 50+ Ready-to-Use Python Automation Scripts?

The AI Toolkit includes scheduling, web scraping, API integrations, data pipelines, email automation, and more — all production-ready with error handling and documentation.

Get the AI Toolkit — $19