Every production system has work that needs to happen on a schedule. Scrape prices every hour. Clean up temp files at midnight. Send weekly reports on Monday morning. Generate backups every 6 hours. Without proper scheduling, you end up with a mess of shell scripts, manual cron entries, and no visibility into what's running.
Python gives you multiple ways to schedule tasks, from dead-simple one-liners to enterprise-grade distributed queues. The right choice depends on your needs:
Let's go through each approach with real, working code.
The schedule library is the fastest way to add periodic tasks to a Python script. Install it:
pip install schedule
import schedule
import time
def check_prices():
print("Checking prices...")
# Your scraping logic here
def send_daily_report():
print("Sending daily report...")
# Your email/notification logic here
def cleanup_temp_files():
print("Cleaning up...")
# Your cleanup logic here
# Schedule jobs
schedule.every(30).minutes.do(check_prices)
schedule.every().day.at("09:00").do(send_daily_report)
schedule.every().monday.at("08:00").do(cleanup_temp_files)
schedule.every(6).hours.do(lambda: print("Health check OK"))
# Run the scheduler
while True:
schedule.run_pending()
time.sleep(1)
The API is beautifully readable. schedule.every(30).minutes.do(fn) does exactly what it says.
def scrape_site(url: str, max_pages: int = 10):
print(f"Scraping {url} (up to {max_pages} pages)")
# Pass arguments directly
schedule.every(2).hours.do(scrape_site, "https://example.com", max_pages=50)
schedule.every(4).hours.do(scrape_site, "https://another.com")
import schedule
def one_time_setup():
print("Running initial setup...")
return schedule.CancelJob # Removes itself after running
schedule.every(1).seconds.do(one_time_setup)
import schedule
import threading
import time
def run_scheduler():
"""Run scheduler in a background thread."""
while True:
schedule.run_pending()
time.sleep(1)
# Set up jobs
schedule.every(10).minutes.do(lambda: print("Background task"))
# Start scheduler thread
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
scheduler_thread.start()
# Main thread continues doing other work
print("Main thread is free to do other things")
while True:
time.sleep(60)
print("Main thread heartbeat")
When you need persistence, multiple trigger types, timezone support, or job stores, APScheduler is the go-to. It's a full-featured scheduling framework.
pip install apscheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from datetime import datetime, timedelta
scheduler = BlockingScheduler()
def fetch_data():
print(f"[{datetime.now():%H:%M:%S}] Fetching data...")
def generate_report():
print(f"[{datetime.now():%H:%M:%S}] Generating report...")
def one_time_task():
print("This runs exactly once!")
# Interval trigger — every 5 minutes
scheduler.add_job(fetch_data, IntervalTrigger(minutes=5), id="fetch_data")
# Cron trigger — weekdays at 9:00 AM
scheduler.add_job(
generate_report,
CronTrigger(day_of_week="mon-fri", hour=9, minute=0),
id="daily_report",
)
# Date trigger — one-shot at specific time
scheduler.add_job(
one_time_task,
DateTrigger(run_date=datetime.now() + timedelta(hours=1)),
id="one_shot",
)
print("Scheduler starting...")
scheduler.start()
# Full cron expression support
from apscheduler.triggers.cron import CronTrigger
# Every 15 minutes
CronTrigger.from_crontab("*/15 * * * *")
# Weekdays at 8:30 AM
CronTrigger.from_crontab("30 8 * * 1-5")
# First day of every month at midnight
CronTrigger.from_crontab("0 0 1 * *")
# Every Sunday at 6 PM
CronTrigger.from_crontab("0 18 * * 0")
# With timezone
CronTrigger.from_crontab("0 9 * * *", timezone="America/New_York")
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.triggers.interval import IntervalTrigger
# Jobs survive restarts — stored in SQLite
jobstores = {
"default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite")
}
scheduler = BlockingScheduler(jobstores=jobstores)
def my_job():
print("Running persistent job!")
# replace_existing=True prevents duplicates on restart
scheduler.add_job(
my_job,
IntervalTrigger(minutes=10),
id="persistent_job",
replace_existing=True,
)
scheduler.start()
from apscheduler.schedulers.background import BackgroundScheduler
import time
scheduler = BackgroundScheduler()
scheduler.add_job(lambda: print("tick"), "interval", seconds=30)
scheduler.start()
# Your application continues running
try:
while True:
time.sleep(1)
# Main application logic here
except KeyboardInterrupt:
scheduler.shutdown(wait=True) # Graceful shutdown
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def async_task():
print("Running async task...")
await asyncio.sleep(1)
print("Async task done!")
scheduler = AsyncIOScheduler()
scheduler.add_job(async_task, "interval", minutes=5)
scheduler.start()
# Run with asyncio event loop
asyncio.get_event_loop().run_forever()
Sometimes you want Python to manage system crontab entries — adding, removing, or listing cron jobs programmatically. The python-crontab library wraps the system crontab.
pip install python-crontab
from crontab import CronTab
# Access current user's crontab
cron = CronTab(user=True)
# Create a new cron job
job = cron.new(command="/usr/bin/python3 /home/user/scripts/backup.py")
job.setall("0 2 * * *") # Daily at 2:00 AM
job.set_comment("Daily backup script")
# Enable/disable
job.enable(False) # Disable
job.enable(True) # Re-enable
# Write changes to crontab
cron.write()
# List all cron jobs
for job in cron:
print(f"Schedule: {job.slices} | Command: {job.command} | Enabled: {job.is_enabled()}")
# Find jobs by comment
backups = cron.find_comment("Daily backup")
for job in backups:
print(f"Found: {job}")
# Remove a job
cron.remove(job)
cron.write()
from crontab import CronTab
cron = CronTab(user=True)
job = cron.new(command="echo test")
# These all work
job.setall("*/5 * * * *") # Every 5 minutes
job.setall("0 9 * * 1-5") # Weekdays 9 AM
job.setall("0 0 1,15 * *") # 1st and 15th of month
# Check next run time
from datetime import datetime
schedule = job.schedule(date_from=datetime.now())
print(f"Next run: {schedule.get_next()}")
print(f"Previous run: {schedule.get_prev()}")
# Get next 5 run times
for i, dt in enumerate(schedule):
if i >= 5:
break
print(f" Run {i+1}: {dt}")
For serious production workloads — tasks distributed across multiple workers, automatic retries, result tracking — Celery Beat is the industry standard.
pip install celery[redis]
# celery_app.py
from celery import Celery
from celery.schedules import crontab
app = Celery("myapp", broker="redis://localhost:6379/0")
# Configure periodic tasks
app.conf.beat_schedule = {
"scrape-every-hour": {
"task": "tasks.scrape_prices",
"schedule": 3600.0, # seconds
},
"daily-report": {
"task": "tasks.generate_report",
"schedule": crontab(hour=9, minute=0),
},
"weekly-cleanup": {
"task": "tasks.cleanup_old_data",
"schedule": crontab(hour=0, minute=0, day_of_week="sunday"),
},
"every-5-minutes": {
"task": "tasks.health_check",
"schedule": 300.0,
"args": ("production",), # Pass arguments
},
}
app.conf.timezone = "America/New_York"
# tasks.py
from celery_app import app
import logging
logger = logging.getLogger(__name__)
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def scrape_prices(self):
"""Scrape prices with automatic retry on failure."""
try:
logger.info("Scraping prices...")
# Your scraping logic
prices = {"BTC": 65000, "ETH": 3400}
return prices
except Exception as exc:
logger.error(f"Scrape failed: {exc}")
raise self.retry(exc=exc)
@app.task
def generate_report():
"""Generate and email daily report."""
logger.info("Generating report...")
# Build report, send email
return "Report sent"
@app.task
def cleanup_old_data():
"""Remove data older than 90 days."""
logger.info("Cleaning up...")
# Delete old records
return "Cleanup complete"
@app.task
def health_check(environment: str):
"""Check system health."""
logger.info(f"Health check for {environment}")
# Check DB, Redis, API endpoints
return {"status": "healthy", "env": environment}
# Terminal 1: Start the worker
celery -A celery_app worker --loglevel=info
# Terminal 2: Start the beat scheduler
celery -A celery_app beat --loglevel=info
# Or combine both (for development)
celery -A celery_app worker --beat --loglevel=info
# Add a task at runtime
from celery_app import app
from celery.schedules import crontab
# Using database-backed scheduler (django-celery-beat or similar)
# allows adding/removing tasks without restart
# Manual one-off scheduled task
from tasks import scrape_prices
result = scrape_prices.apply_async(countdown=300) # Run in 5 minutes
print(f"Task ID: {result.id}")
# Check result
print(f"Status: {result.status}")
print(f"Result: {result.get(timeout=60)}")
Sometimes you don't want external dependencies. Here's a minimal but functional task scheduler using only the standard library:
import threading
import time
import logging
from datetime import datetime, timedelta
from typing import Callable
from dataclasses import dataclass, field
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
@dataclass
class Job:
name: str
func: Callable
interval_seconds: float
args: tuple = ()
kwargs: dict = field(default_factory=dict)
next_run: datetime = field(default_factory=datetime.now)
last_run: datetime | None = None
run_count: int = 0
error_count: int = 0
enabled: bool = True
class SimpleScheduler:
"""Lightweight task scheduler — no external dependencies."""
def __init__(self):
self.jobs: dict[str, Job] = {}
self._running = False
self._lock = threading.Lock()
self._thread: threading.Thread | None = None
def every(self, seconds: float, name: str | None = None):
"""Decorator to register a periodic job."""
def decorator(func: Callable):
job_name = name or func.__name__
self.add_job(job_name, func, seconds)
return func
return decorator
def add_job(self, name: str, func: Callable, interval_seconds: float,
args: tuple = (), kwargs: dict | None = None):
"""Register a job with the scheduler."""
with self._lock:
self.jobs[name] = Job(
name=name,
func=func,
interval_seconds=interval_seconds,
args=args,
kwargs=kwargs or {},
)
logger.info(f"Job '{name}' registered (every {interval_seconds}s)")
def remove_job(self, name: str):
"""Remove a job by name."""
with self._lock:
if name in self.jobs:
del self.jobs[name]
logger.info(f"Job '{name}' removed")
def _run_job(self, job: Job):
"""Execute a single job with error handling."""
try:
logger.info(f"Running job '{job.name}'...")
job.func(*job.args, **job.kwargs)
job.run_count += 1
job.last_run = datetime.now()
logger.info(f"Job '{job.name}' completed (run #{job.run_count})")
except Exception as e:
job.error_count += 1
logger.error(f"Job '{job.name}' failed: {e} (errors: {job.error_count})")
def _loop(self):
"""Main scheduler loop."""
while self._running:
now = datetime.now()
with self._lock:
due_jobs = [
j for j in self.jobs.values()
if j.enabled and j.next_run <= now
]
for job in due_jobs:
self._run_job(job)
job.next_run = datetime.now() + timedelta(seconds=job.interval_seconds)
time.sleep(0.5) # Check every 500ms
def start(self, background: bool = False):
"""Start the scheduler."""
self._running = True
logger.info(f"Scheduler started with {len(self.jobs)} jobs")
if background:
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
else:
try:
self._loop()
except KeyboardInterrupt:
self.stop()
def stop(self):
"""Stop the scheduler gracefully."""
self._running = False
if self._thread:
self._thread.join(timeout=5)
logger.info("Scheduler stopped")
def status(self) -> list[dict]:
"""Get status of all jobs."""
with self._lock:
return [
{
"name": j.name,
"enabled": j.enabled,
"interval": j.interval_seconds,
"runs": j.run_count,
"errors": j.error_count,
"last_run": j.last_run.isoformat() if j.last_run else None,
"next_run": j.next_run.isoformat(),
}
for j in self.jobs.values()
]
# Usage
scheduler = SimpleScheduler()
@scheduler.every(60, name="heartbeat")
def heartbeat():
print(f"❤️ alive at {datetime.now():%H:%M:%S}")
@scheduler.every(300, name="check_disk")
def check_disk():
import shutil
usage = shutil.disk_usage("/")
pct = usage.used / usage.total * 100
print(f"Disk usage: {pct:.1f}%")
if pct > 90:
print("⚠️ Disk space critical!")
@scheduler.every(3600, name="cleanup")
def cleanup():
from pathlib import Path
tmp = Path("/tmp")
old_files = [f for f in tmp.glob("myapp_*") if f.stat().st_mtime < time.time() - 86400]
for f in old_files:
f.unlink()
print(f"Cleaned {len(old_files)} temp files")
if __name__ == "__main__":
scheduler.start() # Blocking — Ctrl+C to stop
import schedule
import json
import logging
from datetime import datetime
from pathlib import Path
logger = logging.getLogger(__name__)
def scrape_and_store():
"""Scrape data and append to daily JSON file."""
import urllib.request
url = "https://api.example.com/prices"
try:
with urllib.request.urlopen(url, timeout=30) as resp:
data = json.loads(resp.read())
except Exception as e:
logger.error(f"Scrape failed: {e}")
return
# Store with timestamp
entry = {
"timestamp": datetime.now().isoformat(),
"data": data,
}
output = Path(f"data/prices_{datetime.now():%Y%m%d}.jsonl")
output.parent.mkdir(exist_ok=True)
with open(output, "a") as f:
f.write(json.dumps(entry) + "\n")
logger.info(f"Stored {len(data)} prices → {output}")
# Run every hour, plus once at startup
scrape_and_store()
schedule.every(1).hour.do(scrape_and_store)
import sqlite3
from datetime import datetime, timedelta
def cleanup_database(db_path: str, retention_days: int = 90):
"""Remove records older than retention period."""
cutoff = datetime.now() - timedelta(days=retention_days)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
tables = {
"logs": "created_at",
"sessions": "last_active",
"temp_data": "timestamp",
}
total_deleted = 0
for table, date_col in tables.items():
cursor.execute(
f"DELETE FROM {table} WHERE {date_col} < ?",
(cutoff.isoformat(),)
)
deleted = cursor.rowcount
total_deleted += deleted
if deleted > 0:
print(f" {table}: deleted {deleted} rows")
# Reclaim space
cursor.execute("VACUUM")
conn.commit()
conn.close()
print(f"Cleanup complete: {total_deleted} total rows deleted")
return total_deleted
import urllib.request
import json
import time
import logging
logger = logging.getLogger(__name__)
def check_endpoints():
"""Check health of multiple services and alert on failures."""
endpoints = {
"API": "https://api.myapp.com/health",
"Docs": "https://docs.myapp.com",
"Dashboard": "https://dash.myapp.com/ping",
}
results = {}
for name, url in endpoints.items():
start = time.time()
try:
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=10) as resp:
status = resp.status
latency = (time.time() - start) * 1000
results[name] = {
"status": "up",
"code": status,
"latency_ms": round(latency),
}
except Exception as e:
results[name] = {
"status": "down",
"error": str(e),
}
logger.error(f"🔴 {name} is DOWN: {e}")
# Summary
up = sum(1 for r in results.values() if r["status"] == "up")
total = len(results)
logger.info(f"Health check: {up}/{total} services up")
if up < total:
send_alert(results)
return results
def send_alert(results: dict):
"""Send alert for failed services."""
failed = {k: v for k, v in results.items() if v["status"] == "down"}
msg = "⚠️ Service Alert:\n"
for name, info in failed.items():
msg += f" 🔴 {name}: {info.get('error', 'unknown')}\n"
# Send via email, Slack, Telegram, etc.
print(msg)
Scheduled tasks fail. Networks go down, APIs return errors, disks fill up. Your scheduler needs to handle this gracefully.
import time
import logging
from functools import wraps
logger = logging.getLogger(__name__)
def with_retry(max_retries: int = 3, base_delay: float = 1.0,
max_delay: float = 60.0, exceptions: tuple = (Exception,)):
"""Decorator that adds retry logic with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_retries:
logger.error(
f"{func.__name__} failed after {max_retries + 1} "
f"attempts: {e}"
)
raise
delay = min(base_delay * (2 ** attempt), max_delay)
logger.warning(
f"{func.__name__} attempt {attempt + 1} failed: {e}. "
f"Retrying in {delay:.1f}s"
)
time.sleep(delay)
return wrapper
return decorator
@with_retry(max_retries=3, base_delay=2.0)
def fetch_api_data(url: str) -> dict:
import urllib.request
with urllib.request.urlopen(url, timeout=30) as resp:
return json.loads(resp.read())
# Usage in a scheduled job
def scheduled_fetch():
try:
data = fetch_api_data("https://api.example.com/data")
process_data(data)
except Exception:
# All retries exhausted — log and move on
# The next scheduled run will try again
logger.error("Fetch completely failed, will retry next cycle")
import json
from pathlib import Path
from datetime import datetime
class DeadLetterQueue:
"""Store failed job executions for later inspection and replay."""
def __init__(self, path: str = "dead_letters.jsonl"):
self.path = Path(path)
def add(self, job_name: str, error: str, context: dict = None):
"""Record a failed job execution."""
entry = {
"timestamp": datetime.now().isoformat(),
"job": job_name,
"error": error,
"context": context or {},
}
with open(self.path, "a") as f:
f.write(json.dumps(entry) + "\n")
def get_failures(self, job_name: str = None, limit: int = 50) -> list:
"""Retrieve recent failures, optionally filtered by job name."""
if not self.path.exists():
return []
failures = []
for line in self.path.read_text().strip().split("\n"):
if not line:
continue
entry = json.loads(line)
if job_name and entry["job"] != job_name:
continue
failures.append(entry)
return failures[-limit:]
def replay(self, job_registry: dict):
"""Attempt to replay all failed jobs."""
failures = self.get_failures()
replayed = 0
for entry in failures:
job_func = job_registry.get(entry["job"])
if job_func:
try:
job_func(**entry.get("context", {}))
replayed += 1
except Exception as e:
print(f"Replay failed for {entry['job']}: {e}")
print(f"Replayed {replayed}/{len(failures)} failed jobs")
# Usage
dlq = DeadLetterQueue()
def risky_job():
try:
# ... do work
pass
except Exception as e:
dlq.add("risky_job", str(e), {"attempt_time": datetime.now().isoformat()})
raise
import logging
import json
from datetime import datetime
class JobLogger:
"""Structured logging for scheduled jobs."""
def __init__(self, log_file: str = "jobs.log"):
self.logger = logging.getLogger("scheduler")
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter("%(message)s"))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_start(self, job_name: str):
self._log("START", job_name)
def log_success(self, job_name: str, duration_ms: float, result: str = ""):
self._log("SUCCESS", job_name, duration_ms=duration_ms, result=result)
def log_failure(self, job_name: str, error: str, duration_ms: float = 0):
self._log("FAILURE", job_name, duration_ms=duration_ms, error=error)
def _log(self, event: str, job_name: str, **extra):
entry = {
"ts": datetime.now().isoformat(),
"event": event,
"job": job_name,
**extra,
}
self.logger.info(json.dumps(entry))
# Usage
job_log = JobLogger()
def monitored_job(name: str, func, *args, **kwargs):
"""Wrap any function with monitoring."""
import time
job_log.log_start(name)
start = time.time()
try:
result = func(*args, **kwargs)
duration = (time.time() - start) * 1000
job_log.log_success(name, duration, str(result)[:200])
return result
except Exception as e:
duration = (time.time() - start) * 1000
job_log.log_failure(name, str(e), duration)
raise
import time
import urllib.request
from datetime import datetime
class HeartbeatMonitor:
"""Send heartbeats to monitoring services (Dead Man's Snitch, Healthchecks.io)."""
def __init__(self, ping_url: str):
self.ping_url = ping_url
def ping(self, status: str = "ok"):
"""Send heartbeat ping."""
url = f"{self.ping_url}/{status}" if status != "ok" else self.ping_url
try:
urllib.request.urlopen(url, timeout=10)
except Exception:
pass # Monitoring failure shouldn't crash the job
def wrap(self, func):
"""Decorator to add heartbeat to a job."""
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
self.ping("ok")
return result
except Exception as e:
self.ping("fail")
raise
return wrapper
# Usage with Healthchecks.io
monitor = HeartbeatMonitor("https://hc-ping.com/your-uuid-here")
@monitor.wrap
def critical_backup():
"""This job pings healthchecks.io on success/failure."""
# ... backup logic
pass
Jobs will run twice sometimes — on restart, clock skew, or manual trigger. Design them so running twice produces the same result as running once.
import hashlib
import json
from pathlib import Path
def idempotent_process(data: dict, output_dir: str = "processed"):
"""Process data idempotently — skip if already processed."""
# Create a unique key from the input data
data_hash = hashlib.sha256(json.dumps(data, sort_keys=True).encode()).hexdigest()[:16]
marker = Path(output_dir) / f".done_{data_hash}"
if marker.exists():
print(f"Already processed: {data_hash}")
return
# Do the actual work
result = transform(data)
save(result, output_dir)
# Mark as done
marker.touch()
print(f"Processed: {data_hash}")
from datetime import datetime
from zoneinfo import ZoneInfo # Python 3.9+
def get_schedule_time(hour: int, minute: int, tz_name: str) -> datetime:
"""Get next occurrence of a specific time in a timezone."""
tz = ZoneInfo(tz_name)
now = datetime.now(tz)
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
return target
# Always store and compare in UTC internally
def log_job_time(job_name: str):
utc_now = datetime.now(ZoneInfo("UTC"))
local_now = datetime.now(ZoneInfo("America/New_York"))
print(f"Job '{job_name}' ran at {utc_now.isoformat()} UTC ({local_now.strftime('%H:%M %Z')} local)")
import signal
import sys
class GracefulScheduler:
"""Scheduler that handles SIGTERM/SIGINT gracefully."""
def __init__(self):
self._shutdown = False
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
def _handle_signal(self, signum, frame):
print(f"\nReceived signal {signum}, shutting down gracefully...")
self._shutdown = True
def run(self, jobs: list):
"""Run scheduler until shutdown signal."""
print(f"Scheduler running with {len(jobs)} jobs (PID: {os.getpid()})")
while not self._shutdown:
for job in jobs:
if self._shutdown:
break
if job.is_due():
job.run()
time.sleep(1)
# Cleanup
print("Waiting for running jobs to finish...")
# ... wait for threads/processes
print("Scheduler stopped cleanly")
sys.exit(0)
import threading
import fcntl
from pathlib import Path
def with_file_lock(lock_name: str):
"""Prevent a job from running concurrently (even across processes)."""
def decorator(func):
def wrapper(*args, **kwargs):
lock_path = Path(f"/tmp/{lock_name}.lock")
lock_file = open(lock_path, "w")
try:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
print(f"Job '{lock_name}' already running, skipping")
lock_file.close()
return None
try:
return func(*args, **kwargs)
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
lock_file.close()
return wrapper
return decorator
@with_file_lock("daily_scrape")
def daily_scrape():
"""Only one instance of this can run at a time."""
# Long-running scrape that might overlap with next schedule
pass
Here's a production-ready scheduler that combines everything — persistence, monitoring, retry, and graceful shutdown:
#!/usr/bin/env python3
"""
Production Task Scheduler
- Multiple job types (interval, cron-like)
- Persistent state (survives restarts)
- Structured logging
- Retry with backoff
- Graceful shutdown
"""
import json
import time
import signal
import logging
import threading
from datetime import datetime, timedelta
from pathlib import Path
from dataclasses import dataclass, field, asdict
from typing import Callable
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("scheduler.log"),
],
)
logger = logging.getLogger("scheduler")
@dataclass
class JobConfig:
name: str
func: Callable
interval_seconds: float
max_retries: int = 2
retry_delay: float = 5.0
enabled: bool = True
overlap: bool = False # Allow concurrent runs
@dataclass
class JobState:
next_run: str = ""
last_run: str = ""
last_status: str = "pending"
run_count: int = 0
error_count: int = 0
consecutive_errors: int = 0
class ProductionScheduler:
def __init__(self, state_file: str = "scheduler_state.json"):
self.jobs: dict[str, JobConfig] = {}
self.state: dict[str, JobState] = {}
self.state_file = Path(state_file)
self._running = False
self._active_jobs: set[str] = set()
self._lock = threading.Lock()
self._load_state()
signal.signal(signal.SIGTERM, self._shutdown)
signal.signal(signal.SIGINT, self._shutdown)
def register(self, name: str, func: Callable, interval_seconds: float,
max_retries: int = 2, enabled: bool = True):
"""Register a new job."""
self.jobs[name] = JobConfig(
name=name, func=func, interval_seconds=interval_seconds,
max_retries=max_retries, enabled=enabled,
)
if name not in self.state:
self.state[name] = JobState(
next_run=datetime.now().isoformat()
)
logger.info(f"Registered job '{name}' (every {interval_seconds}s)")
def _load_state(self):
if self.state_file.exists():
raw = json.loads(self.state_file.read_text())
for name, data in raw.items():
self.state[name] = JobState(**data)
logger.info(f"Loaded state for {len(self.state)} jobs")
def _save_state(self):
raw = {name: asdict(state) for name, state in self.state.items()}
self.state_file.write_text(json.dumps(raw, indent=2))
def _run_job(self, config: JobConfig, state: JobState):
"""Execute a job with retry logic."""
if not config.overlap and config.name in self._active_jobs:
logger.warning(f"Job '{config.name}' still running, skipping")
return
with self._lock:
self._active_jobs.add(config.name)
start = time.time()
success = False
for attempt in range(config.max_retries + 1):
try:
config.func()
success = True
break
except Exception as e:
if attempt < config.max_retries:
delay = config.retry_delay * (2 ** attempt)
logger.warning(
f"Job '{config.name}' attempt {attempt + 1} failed: {e}. "
f"Retrying in {delay:.0f}s"
)
time.sleep(delay)
else:
logger.error(
f"Job '{config.name}' failed after {attempt + 1} attempts: {e}"
)
duration = (time.time() - start) * 1000
now = datetime.now()
state.last_run = now.isoformat()
state.run_count += 1
state.next_run = (now + timedelta(seconds=config.interval_seconds)).isoformat()
if success:
state.last_status = "success"
state.consecutive_errors = 0
logger.info(f"✅ '{config.name}' completed in {duration:.0f}ms")
else:
state.last_status = "error"
state.error_count += 1
state.consecutive_errors += 1
logger.error(
f"❌ '{config.name}' failed ({state.consecutive_errors} consecutive)"
)
with self._lock:
self._active_jobs.discard(config.name)
self._save_state()
def _shutdown(self, signum, frame):
logger.info(f"Shutdown signal received ({signum})")
self._running = False
def start(self):
"""Run the scheduler."""
self._running = True
logger.info(f"Scheduler started with {len(self.jobs)} jobs")
while self._running:
now = datetime.now()
for name, config in self.jobs.items():
if not config.enabled:
continue
state = self.state.get(name)
if not state:
continue
next_run = datetime.fromisoformat(state.next_run)
if now >= next_run:
thread = threading.Thread(
target=self._run_job, args=(config, state),
name=f"job-{name}", daemon=True,
)
thread.start()
time.sleep(1)
# Wait for active jobs
logger.info("Waiting for active jobs to finish...")
for _ in range(30):
if not self._active_jobs:
break
time.sleep(1)
self._save_state()
logger.info("Scheduler stopped")
def status(self):
"""Print status of all jobs."""
print(f"\n{'Job':<25} {'Status':<10} {'Runs':<8} {'Errors':<8} {'Next Run'}")
print("-" * 80)
for name, config in self.jobs.items():
state = self.state.get(name, JobState())
enabled = "✓" if config.enabled else "✗"
print(
f"{enabled} {name:<23} {state.last_status:<10} "
f"{state.run_count:<8} {state.error_count:<8} "
f"{state.next_run[:19]}"
)
print()
# === Define your jobs ===
import shutil
import os
def check_disk_space():
usage = shutil.disk_usage("/")
pct = usage.used / usage.total * 100
logger.info(f"Disk: {pct:.1f}% used ({usage.free // (1024**3)}GB free)")
if pct > 90:
raise RuntimeError(f"Disk space critical: {pct:.1f}%")
def cleanup_logs():
log_dir = Path("logs")
if not log_dir.exists():
return
cutoff = time.time() - (7 * 86400) # 7 days
removed = 0
for f in log_dir.glob("*.log"):
if f.stat().st_mtime < cutoff:
f.unlink()
removed += 1
logger.info(f"Removed {removed} old log files")
def check_services():
import urllib.request
services = {
"Google DNS": "https://dns.google/resolve?name=example.com",
}
for name, url in services.items():
try:
urllib.request.urlopen(url, timeout=5)
logger.info(f" ✓ {name}")
except Exception as e:
logger.error(f" ✗ {name}: {e}")
raise
if __name__ == "__main__":
sched = ProductionScheduler()
sched.register("disk_check", check_disk_space, interval_seconds=300)
sched.register("log_cleanup", cleanup_logs, interval_seconds=86400)
sched.register("service_health", check_services, interval_seconds=600)
sched.status()
sched.start()
The AI Toolkit includes scheduling, web scraping, API integrations, data pipelines, email automation, and more — all production-ready with error handling and documentation.
Get the AI Toolkit — $19