🤖 AI Agent Toolkit

Python File Automation — Organize, Rename, Backup & Sync Your Files

The average developer spends hours every week on file management — sorting downloads, renaming screenshots, backing up project folders, hunting duplicates. All of it is scriptable.

This guide covers five production-ready Python scripts that handle the most common file tasks. Every example uses the standard library (plus watchdog for real-time monitoring). Copy, adapt, automate.

1. Auto-Organize Files by Type

Sort files into folders by extension, with configurable rules, dry-run mode, and a move log for undo operations.

#!/usr/bin/env python3
"""Auto-organize files by extension with undo support."""
import json
import shutil
from pathlib import Path
from datetime import datetime

RULES = {
    "Images":     {".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg", ".bmp", ".ico"},
    "Documents":  {".pdf", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", ".txt", ".md"},
    "Code":       {".py", ".js", ".ts", ".go", ".rs", ".java", ".c", ".cpp", ".h", ".rb", ".sh"},
    "Data":       {".json", ".csv", ".xml", ".yaml", ".yml", ".toml", ".sql", ".parquet"},
    "Archives":   {".zip", ".tar", ".gz", ".bz2", ".7z", ".rar", ".xz"},
    "Audio":      {".mp3", ".wav", ".flac", ".ogg", ".m4a", ".aac"},
    "Video":      {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv"},
}

def organize(source_dir: str, dry_run: bool = False, log_file: str = "organize_log.json"):
    source = Path(source_dir)
    if not source.is_dir():
        raise FileNotFoundError(f"Directory not found: {source}")

    ext_to_folder = {}
    for folder, exts in RULES.items():
        for ext in exts:
            ext_to_folder[ext] = folder

    moves = []
    for item in source.iterdir():
        if item.is_file() and item.name != log_file:
            folder_name = ext_to_folder.get(item.suffix.lower(), "Other")
            dest_dir = source / folder_name
            dest_file = dest_dir / item.name

            # Handle name collisions
            if dest_file.exists():
                stem = item.stem
                ts = datetime.now().strftime("%Y%m%d_%H%M%S")
                dest_file = dest_dir / f"{stem}_{ts}{item.suffix}"

            if dry_run:
                print(f"  [DRY] {item.name} → {folder_name}/")
            else:
                dest_dir.mkdir(exist_ok=True)
                shutil.move(str(item), str(dest_file))
                moves.append({"from": str(item), "to": str(dest_file)})
                print(f"  ✓ {item.name} → {folder_name}/")

    if moves and not dry_run:
        log_path = source / log_file
        existing = json.loads(log_path.read_text()) if log_path.exists() else []
        existing.extend(moves)
        log_path.write_text(json.dumps(existing, indent=2))

    print(f"\n{'[DRY RUN] ' if dry_run else ''}Organized {len(moves)} files.")
    return moves

def undo(source_dir: str, log_file: str = "organize_log.json"):
    log_path = Path(source_dir) / log_file
    if not log_path.exists():
        print("No log file found. Nothing to undo.")
        return

    moves = json.loads(log_path.read_text())
    for move in reversed(moves):
        src, dst = Path(move["to"]), Path(move["from"])
        if src.exists():
            shutil.move(str(src), str(dst))
            print(f"  ↩ {src.name} → {dst.parent.name}/")

    log_path.unlink()
    print(f"Undid {len(moves)} moves.")

if __name__ == "__main__":
    import sys
    if len(sys.argv) < 2:
        print("Usage: organize.py <directory> [--dry-run] [--undo]")
        sys.exit(1)

    target = sys.argv[1]
    if "--undo" in sys.argv:
        undo(target)
    else:
        organize(target, dry_run="--dry-run" in sys.argv)

Run with --dry-run first to preview changes. The JSON log tracks every move so --undo restores everything to its original location.

2. Real-Time File Monitoring with Watchdog

Instead of running a script manually, watch a folder and react to changes instantly. Great for auto-organizing downloads or triggering builds.

#!/usr/bin/env python3
"""Watch a directory and auto-organize new files. Requires: pip install watchdog"""
import time
import shutil
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

RULES = {
    ".pdf": "Documents", ".doc": "Documents", ".docx": "Documents",
    ".jpg": "Images", ".jpeg": "Images", ".png": "Images", ".gif": "Images",
    ".mp4": "Video", ".mkv": "Video", ".avi": "Video",
    ".zip": "Archives", ".tar": "Archives", ".gz": "Archives",
    ".py": "Code", ".js": "Code", ".ts": "Code",
}

class FileOrganizer(FileSystemEventHandler):
    def __init__(self, watch_dir: str, cooldown: float = 2.0):
        self.watch_dir = Path(watch_dir)
        self.cooldown = cooldown
        self._recent = {}

    def on_created(self, event):
        if event.is_directory:
            return
        path = Path(event.src_path)

        # Debounce: browsers create temp files first
        now = time.time()
        if path.name in self._recent and now - self._recent[path.name] < self.cooldown:
            return
        self._recent[path.name] = now

        # Wait for file to finish writing
        time.sleep(1)
        if not path.exists():
            return

        folder = RULES.get(path.suffix.lower())
        if folder:
            dest_dir = self.watch_dir / folder
            dest_dir.mkdir(exist_ok=True)
            dest = dest_dir / path.name
            if not dest.exists():
                shutil.move(str(path), str(dest))
                print(f"  ✓ {path.name} → {folder}/")

def watch(directory: str):
    observer = Observer()
    observer.schedule(FileOrganizer(directory), directory, recursive=False)
    observer.start()
    print(f"Watching {directory} for new files... (Ctrl+C to stop)")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

if __name__ == "__main__":
    import sys
    watch(sys.argv[1] if len(sys.argv) > 1 else str(Path.home() / "Downloads"))

Start this as a background process or system service. Every new file in your Downloads folder gets sorted automatically.

3. Bulk Rename with Patterns

Rename hundreds of files using regex, date stamps, or sequential numbering. Supports preview mode and collision-safe naming.

#!/usr/bin/env python3
"""Bulk rename files with regex, dates, or sequential numbering."""
import re
from pathlib import Path
from datetime import datetime

def rename_regex(directory: str, pattern: str, replacement: str,
                 dry_run: bool = True, extensions: set = None):
    """Rename files matching a regex pattern.
    
    Examples:
        rename_regex("./photos", r"IMG_(\d+)", r"photo_\1")
        rename_regex("./docs", r"report_v\d+", "report_final", extensions={".pdf"})
    """
    target = Path(directory)
    renamed = []

    for item in sorted(target.iterdir()):
        if not item.is_file():
            continue
        if extensions and item.suffix.lower() not in extensions:
            continue

        new_stem = re.sub(pattern, replacement, item.stem)
        if new_stem == item.stem:
            continue

        new_name = new_stem + item.suffix
        new_path = item.parent / new_name

        # Collision safety
        counter = 1
        while new_path.exists() and new_path != item:
            new_path = item.parent / f"{new_stem}_{counter}{item.suffix}"
            counter += 1

        if dry_run:
            print(f"  [DRY] {item.name} → {new_path.name}")
        else:
            item.rename(new_path)
            print(f"  ✓ {item.name} → {new_path.name}")
        renamed.append((str(item), str(new_path)))

    return renamed

def rename_sequential(directory: str, prefix: str = "file",
                      start: int = 1, pad: int = 3,
                      extensions: set = None, dry_run: bool = True):
    """Rename files with sequential numbers: photo_001.jpg, photo_002.jpg, ..."""
    target = Path(directory)
    files = sorted(
        [f for f in target.iterdir()
         if f.is_file() and (not extensions or f.suffix.lower() in extensions)],
        key=lambda f: f.stat().st_mtime
    )

    renamed = []
    for i, item in enumerate(files, start=start):
        new_name = f"{prefix}_{str(i).zfill(pad)}{item.suffix}"
        new_path = item.parent / new_name

        if dry_run:
            print(f"  [DRY] {item.name} → {new_name}")
        else:
            item.rename(new_path)
            print(f"  ✓ {item.name} → {new_name}")
        renamed.append((str(item), str(new_path)))

    return renamed

def rename_by_date(directory: str, fmt: str = "%Y-%m-%d",
                   use_exif: bool = False, dry_run: bool = True):
    """Rename files using their modification date (or EXIF date for photos)."""
    target = Path(directory)
    date_counts = {}

    for item in sorted(target.iterdir(), key=lambda f: f.stat().st_mtime):
        if not item.is_file():
            continue

        mtime = datetime.fromtimestamp(item.stat().st_mtime)
        date_str = mtime.strftime(fmt)

        # Track duplicates per date
        key = (date_str, item.suffix)
        date_counts[key] = date_counts.get(key, 0) + 1
        count = date_counts[key]

        suffix = f"_{count}" if count > 1 else ""
        new_name = f"{date_str}{suffix}{item.suffix}"
        new_path = item.parent / new_name

        if dry_run:
            print(f"  [DRY] {item.name} → {new_name}")
        else:
            item.rename(new_path)
            print(f"  ✓ {item.name} → {new_name}")

if __name__ == "__main__":
    import sys
    if len(sys.argv) < 2:
        print("Usage:")
        print("  rename.py regex <dir> <pattern> <replacement> [--apply]")
        print("  rename.py seq <dir> <prefix> [--apply]")
        print("  rename.py date <dir> [--apply]")
        sys.exit(1)

    mode = sys.argv[1]
    dry = "--apply" not in sys.argv

    if mode == "regex" and len(sys.argv) >= 5:
        rename_regex(sys.argv[2], sys.argv[3], sys.argv[4], dry_run=dry)
    elif mode == "seq" and len(sys.argv) >= 4:
        rename_sequential(sys.argv[2], prefix=sys.argv[3], dry_run=dry)
    elif mode == "date" and len(sys.argv) >= 3:
        rename_by_date(sys.argv[2], dry_run=dry)

Three modes in one script. Always previews first — pass --apply to commit changes. See our automation scripts article for more utility patterns.

4. Incremental File Backup

A backup script that only copies changed files (hash-based deduplication), compresses archives, and rotates old backups.

#!/usr/bin/env python3
"""Incremental backup with hash tracking, compression, and rotation."""
import hashlib
import json
import shutil
import tarfile
from pathlib import Path
from datetime import datetime

def file_hash(path: Path, algorithm: str = "sha256") -> str:
    h = hashlib.new(algorithm)
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()

class IncrementalBackup:
    def __init__(self, source: str, backup_dir: str, max_backups: int = 10):
        self.source = Path(source)
        self.backup_dir = Path(backup_dir)
        self.backup_dir.mkdir(parents=True, exist_ok=True)
        self.state_file = self.backup_dir / ".backup_state.json"
        self.max_backups = max_backups
        self.state = self._load_state()

    def _load_state(self) -> dict:
        if self.state_file.exists():
            return json.loads(self.state_file.read_text())
        return {"files": {}, "backups": []}

    def _save_state(self):
        self.state_file.write_text(json.dumps(self.state, indent=2))

    def run(self, compress: bool = True, exclude: set = None):
        exclude = exclude or {".git", "__pycache__", "node_modules", ".venv"}
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

        # Find changed files
        changed = []
        current_files = {}

        for item in self.source.rglob("*"):
            if item.is_dir():
                continue
            if any(part in exclude for part in item.parts):
                continue

            rel = str(item.relative_to(self.source))
            h = file_hash(item)
            current_files[rel] = h

            if self.state["files"].get(rel) != h:
                changed.append(item)

        # Detect deleted files
        deleted = set(self.state["files"]) - set(current_files)

        if not changed and not deleted:
            print("No changes detected. Skipping backup.")
            return None

        print(f"Changes: {len(changed)} modified/new, {len(deleted)} deleted")

        # Create backup
        backup_name = f"backup_{timestamp}"
        if compress:
            archive_path = self.backup_dir / f"{backup_name}.tar.gz"
            with tarfile.open(archive_path, "w:gz") as tar:
                for item in changed:
                    arcname = str(item.relative_to(self.source))
                    tar.add(item, arcname=arcname)
            size_mb = archive_path.stat().st_size / (1024 * 1024)
            print(f"Created: {archive_path.name} ({size_mb:.1f} MB, {len(changed)} files)")
        else:
            backup_path = self.backup_dir / backup_name
            backup_path.mkdir()
            for item in changed:
                rel = item.relative_to(self.source)
                dest = backup_path / rel
                dest.parent.mkdir(parents=True, exist_ok=True)
                shutil.copy2(item, dest)
            print(f"Created: {backup_path.name} ({len(changed)} files)")

        # Update state
        self.state["files"] = current_files
        self.state["backups"].append({
            "name": backup_name,
            "timestamp": timestamp,
            "files_changed": len(changed),
            "files_deleted": len(deleted),
            "compressed": compress,
        })
        self._save_state()

        # Rotate old backups
        self._rotate()
        return backup_name

    def _rotate(self):
        backups = self.state["backups"]
        while len(backups) > self.max_backups:
            old = backups.pop(0)
            old_path = self.backup_dir / old["name"]
            for ext in [".tar.gz", ""]:
                p = Path(str(old_path) + ext)
                if p.exists():
                    if p.is_dir():
                        shutil.rmtree(p)
                    else:
                        p.unlink()
                    print(f"  Rotated: {p.name}")
        self._save_state()

    def list_backups(self):
        for b in self.state["backups"]:
            print(f"  {b['name']} — {b['files_changed']} changed, {b['files_deleted']} deleted")

if __name__ == "__main__":
    import sys
    if len(sys.argv) < 3:
        print("Usage: backup.py <source_dir> <backup_dir> [--no-compress] [--list]")
        sys.exit(1)

    backup = IncrementalBackup(sys.argv[1], sys.argv[2])
    if "--list" in sys.argv:
        backup.list_backups()
    else:
        backup.run(compress="--no-compress" not in sys.argv)

Pair this with cron or APScheduler for fully automated daily backups. The hash-based approach means only actual changes get backed up — no wasted space.

5. Duplicate File Detector

Find duplicate files across directories using content hashing. Groups duplicates by hash and reports wasted space.

#!/usr/bin/env python3
"""Find duplicate files by content hash. Reports groups and wasted space."""
import hashlib
import sys
from pathlib import Path
from collections import defaultdict

def file_hash(path: Path, quick: bool = False) -> str:
    """Hash file contents. Quick mode hashes first/last 4KB + size for speed."""
    if quick:
        size = path.stat().st_size
        h = hashlib.md5(str(size).encode())
        with open(path, "rb") as f:
            h.update(f.read(4096))
            if size > 8192:
                f.seek(-4096, 2)
                h.update(f.read(4096))
        return h.hexdigest()
    else:
        h = hashlib.sha256()
        with open(path, "rb") as f:
            for chunk in iter(lambda: f.read(8192), b""):
                h.update(chunk)
        return h.hexdigest()

def find_duplicates(directories: list, min_size: int = 1024,
                    exclude: set = None, quick: bool = True) -> dict:
    """Scan directories for duplicate files.
    
    Two-pass approach for speed:
    1. Group by file size (fast filter)
    2. Hash only files with matching sizes
    """
    exclude = exclude or {".git", "__pycache__", "node_modules", ".venv"}

    # Pass 1: Group by size
    size_groups = defaultdict(list)
    total_scanned = 0

    for directory in directories:
        for item in Path(directory).rglob("*"):
            if not item.is_file() or item.is_symlink():
                continue
            if any(part in exclude for part in item.parts):
                continue
            size = item.stat().st_size
            if size >= min_size:
                size_groups[size].append(item)
                total_scanned += 1

    print(f"Scanned {total_scanned} files")

    # Pass 2: Hash files with duplicate sizes
    candidates = {s: files for s, files in size_groups.items() if len(files) > 1}
    print(f"Size matches: {sum(len(f) for f in candidates.values())} files in {len(candidates)} groups")

    hash_groups = defaultdict(list)
    for size, files in candidates.items():
        for f in files:
            try:
                h = file_hash(f, quick=quick)
                # If quick mode found matches, verify with full hash
                hash_groups[h].append(f)
            except (PermissionError, OSError):
                continue

    # Filter to actual duplicates
    duplicates = {h: files for h, files in hash_groups.items() if len(files) > 1}

    # If quick mode, verify with full hash
    if quick and duplicates:
        verified = defaultdict(list)
        for h, files in duplicates.items():
            for f in files:
                try:
                    full_h = file_hash(f, quick=False)
                    verified[full_h].append(f)
                except (PermissionError, OSError):
                    continue
        duplicates = {h: files for h, files in verified.items() if len(files) > 1}

    return duplicates

def report(duplicates: dict):
    """Print a human-readable duplicate report."""
    if not duplicates:
        print("\n✅ No duplicates found!")
        return

    total_wasted = 0
    for i, (h, files) in enumerate(duplicates.items(), 1):
        size = files[0].stat().st_size
        wasted = size * (len(files) - 1)
        total_wasted += wasted

        print(f"\n--- Group {i} ({len(files)} copies, {size:,} bytes each) ---")
        for f in sorted(files, key=lambda p: str(p)):
            print(f"  {f}")

    wasted_mb = total_wasted / (1024 * 1024)
    print(f"\n📊 Summary: {len(duplicates)} duplicate groups, {wasted_mb:.1f} MB wasted")

if __name__ == "__main__":
    dirs = sys.argv[1:] if len(sys.argv) > 1 else ["."]
    dupes = find_duplicates(dirs)
    report(dupes)

The two-pass approach (size filter first, then hash) makes it fast even on large directories. Quick mode adds another speedup by only hashing file boundaries before doing a full SHA-256 verification.

Want the Complete File Automation Suite?

These scripts are from the AI Agent Toolkit — 50+ production-ready Python scripts covering file management, web scraping, API integration, data processing, DevOps, and more. Plus 30+ curated AI prompts.

Every script is standalone, documented, and ready to use.

Get the Toolkit — $19

6. Two-Way File Sync

Sync files between two directories with conflict detection. Useful for keeping project folders, config directories, or media libraries in sync across drives.

#!/usr/bin/env python3
"""Two-way file sync with conflict detection and resolution."""
import hashlib
import json
import shutil
from pathlib import Path
from datetime import datetime

def file_hash(path: Path) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()

class FileSync:
    def __init__(self, dir_a: str, dir_b: str):
        self.dir_a = Path(dir_a)
        self.dir_b = Path(dir_b)
        self.state_file = self.dir_a / ".sync_state.json"
        self.state = self._load_state()

    def _load_state(self) -> dict:
        if self.state_file.exists():
            return json.loads(self.state_file.read_text())
        return {"files": {}}

    def _save_state(self):
        self.state_file.write_text(json.dumps(self.state, indent=2))

    def _scan(self, root: Path, exclude: set) -> dict:
        files = {}
        for item in root.rglob("*"):
            if item.is_dir() or item.name.startswith(".sync"):
                continue
            if any(part in exclude for part in item.parts):
                continue
            rel = str(item.relative_to(root))
            files[rel] = {
                "hash": file_hash(item),
                "mtime": item.stat().st_mtime,
                "size": item.stat().st_size,
            }
        return files

    def sync(self, dry_run: bool = True, exclude: set = None,
             conflict: str = "newer"):
        """Sync two directories.
        
        conflict: "newer" (keep newer mtime), "a" (prefer dir_a), "b" (prefer dir_b)
        """
        exclude = exclude or {".git", "__pycache__", "node_modules"}
        files_a = self._scan(self.dir_a, exclude)
        files_b = self._scan(self.dir_b, exclude)
        prev = self.state["files"]

        all_files = set(files_a) | set(files_b) | set(prev)
        actions = []

        for rel in sorted(all_files):
            in_a = rel in files_a
            in_b = rel in files_b
            in_prev = rel in prev

            if in_a and not in_b:
                if in_prev:
                    # Was synced before, deleted from B → delete from A
                    actions.append(("delete_a", rel, "deleted in B"))
                else:
                    # New in A → copy to B
                    actions.append(("copy_a_to_b", rel, "new in A"))

            elif not in_a and in_b:
                if in_prev:
                    actions.append(("delete_b", rel, "deleted in A"))
                else:
                    actions.append(("copy_b_to_a", rel, "new in B"))

            elif in_a and in_b:
                if files_a[rel]["hash"] != files_b[rel]["hash"]:
                    # Conflict: both modified
                    if conflict == "newer":
                        if files_a[rel]["mtime"] >= files_b[rel]["mtime"]:
                            actions.append(("copy_a_to_b", rel, "newer in A"))
                        else:
                            actions.append(("copy_b_to_a", rel, "newer in B"))
                    elif conflict == "a":
                        actions.append(("copy_a_to_b", rel, "prefer A"))
                    else:
                        actions.append(("copy_b_to_a", rel, "prefer B"))

        if not actions:
            print("Directories are in sync.")
            return

        for action, rel, reason in actions:
            if action == "copy_a_to_b":
                src, dst = self.dir_a / rel, self.dir_b / rel
            elif action == "copy_b_to_a":
                src, dst = self.dir_b / rel, self.dir_a / rel
            elif action.startswith("delete"):
                target = self.dir_a / rel if "a" in action else self.dir_b / rel
                tag = "[DRY] " if dry_run else ""
                print(f"  {tag}🗑 {rel} ({reason})")
                if not dry_run and target.exists():
                    target.unlink()
                continue
            else:
                continue

            tag = "[DRY] " if dry_run else ""
            print(f"  {tag}→ {rel} ({reason})")
            if not dry_run:
                dst.parent.mkdir(parents=True, exist_ok=True)
                shutil.copy2(src, dst)

        if not dry_run:
            # Update state with current snapshot
            merged = {}
            for rel in set(files_a) | set(files_b):
                a_info = files_a.get(rel)
                b_info = files_b.get(rel)
                merged[rel] = (a_info or b_info)["hash"]
            self.state["files"] = merged
            self._save_state()

        print(f"\n{'[DRY RUN] ' if dry_run else ''}{len(actions)} actions")

if __name__ == "__main__":
    import sys
    if len(sys.argv) < 3:
        print("Usage: sync.py <dir_a> <dir_b> [--apply] [--prefer a|b|newer]")
        sys.exit(1)

    prefer = "newer"
    if "--prefer" in sys.argv:
        idx = sys.argv.index("--prefer")
        prefer = sys.argv[idx + 1] if idx + 1 < len(sys.argv) else "newer"

    syncer = FileSync(sys.argv[1], sys.argv[2])
    syncer.sync(dry_run="--apply" not in sys.argv, conflict=prefer)

The state file tracks what was synced last time, so it correctly handles deletions (a file removed from one side gets removed from the other, not re-copied back). See the database operations guide for more on state management patterns.

Putting It All Together

Here's a practical workflow combining these tools:

#!/usr/bin/env python3
"""Weekly file maintenance: organize, deduplicate, backup."""
from pathlib import Path

# Import the scripts above (or inline them)
# from organize import organize
# from duplicates import find_duplicates, report
# from backup import IncrementalBackup

def weekly_maintenance():
    home = Path.home()

    # 1. Organize downloads
    print("=== Step 1: Organize Downloads ===")
    organize(str(home / "Downloads"))

    # 2. Find duplicates in common dirs
    print("\n=== Step 2: Find Duplicates ===")
    dupes = find_duplicates([
        str(home / "Documents"),
        str(home / "Downloads"),
    ])
    report(dupes)

    # 3. Backup important dirs
    print("\n=== Step 3: Incremental Backup ===")
    for source in ["Documents", "Projects", "Photos"]:
        src = home / source
        if src.exists():
            backup = IncrementalBackup(str(src), str(home / "Backups" / source))
            backup.run()

    print("\n✅ Weekly maintenance complete!")

if __name__ == "__main__":
    weekly_maintenance()

Automate Everything

The AI Agent Toolkit includes these scripts plus 45+ more — web scraping, API automation, data processing, CLI tools, email automation, and more. All production-ready, all standalone.

Get the Full Toolkit — $19