The average developer spends hours every week on file management — sorting downloads, renaming screenshots, backing up project folders, hunting duplicates. All of it is scriptable.
This guide covers five production-ready Python scripts that handle the most common file tasks. Every example uses the standard library (plus watchdog for real-time monitoring). Copy, adapt, automate.
Sort files into folders by extension, with configurable rules, dry-run mode, and a move log for undo operations.
#!/usr/bin/env python3
"""Auto-organize files by extension with undo support."""
import json
import shutil
from pathlib import Path
from datetime import datetime
RULES = {
"Images": {".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg", ".bmp", ".ico"},
"Documents": {".pdf", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", ".txt", ".md"},
"Code": {".py", ".js", ".ts", ".go", ".rs", ".java", ".c", ".cpp", ".h", ".rb", ".sh"},
"Data": {".json", ".csv", ".xml", ".yaml", ".yml", ".toml", ".sql", ".parquet"},
"Archives": {".zip", ".tar", ".gz", ".bz2", ".7z", ".rar", ".xz"},
"Audio": {".mp3", ".wav", ".flac", ".ogg", ".m4a", ".aac"},
"Video": {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv"},
}
def organize(source_dir: str, dry_run: bool = False, log_file: str = "organize_log.json"):
source = Path(source_dir)
if not source.is_dir():
raise FileNotFoundError(f"Directory not found: {source}")
ext_to_folder = {}
for folder, exts in RULES.items():
for ext in exts:
ext_to_folder[ext] = folder
moves = []
for item in source.iterdir():
if item.is_file() and item.name != log_file:
folder_name = ext_to_folder.get(item.suffix.lower(), "Other")
dest_dir = source / folder_name
dest_file = dest_dir / item.name
# Handle name collisions
if dest_file.exists():
stem = item.stem
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
dest_file = dest_dir / f"{stem}_{ts}{item.suffix}"
if dry_run:
print(f" [DRY] {item.name} → {folder_name}/")
else:
dest_dir.mkdir(exist_ok=True)
shutil.move(str(item), str(dest_file))
moves.append({"from": str(item), "to": str(dest_file)})
print(f" ✓ {item.name} → {folder_name}/")
if moves and not dry_run:
log_path = source / log_file
existing = json.loads(log_path.read_text()) if log_path.exists() else []
existing.extend(moves)
log_path.write_text(json.dumps(existing, indent=2))
print(f"\n{'[DRY RUN] ' if dry_run else ''}Organized {len(moves)} files.")
return moves
def undo(source_dir: str, log_file: str = "organize_log.json"):
log_path = Path(source_dir) / log_file
if not log_path.exists():
print("No log file found. Nothing to undo.")
return
moves = json.loads(log_path.read_text())
for move in reversed(moves):
src, dst = Path(move["to"]), Path(move["from"])
if src.exists():
shutil.move(str(src), str(dst))
print(f" ↩ {src.name} → {dst.parent.name}/")
log_path.unlink()
print(f"Undid {len(moves)} moves.")
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: organize.py <directory> [--dry-run] [--undo]")
sys.exit(1)
target = sys.argv[1]
if "--undo" in sys.argv:
undo(target)
else:
organize(target, dry_run="--dry-run" in sys.argv)
Run with --dry-run first to preview changes. The JSON log tracks every move so --undo restores everything to its original location.
Instead of running a script manually, watch a folder and react to changes instantly. Great for auto-organizing downloads or triggering builds.
#!/usr/bin/env python3
"""Watch a directory and auto-organize new files. Requires: pip install watchdog"""
import time
import shutil
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
RULES = {
".pdf": "Documents", ".doc": "Documents", ".docx": "Documents",
".jpg": "Images", ".jpeg": "Images", ".png": "Images", ".gif": "Images",
".mp4": "Video", ".mkv": "Video", ".avi": "Video",
".zip": "Archives", ".tar": "Archives", ".gz": "Archives",
".py": "Code", ".js": "Code", ".ts": "Code",
}
class FileOrganizer(FileSystemEventHandler):
def __init__(self, watch_dir: str, cooldown: float = 2.0):
self.watch_dir = Path(watch_dir)
self.cooldown = cooldown
self._recent = {}
def on_created(self, event):
if event.is_directory:
return
path = Path(event.src_path)
# Debounce: browsers create temp files first
now = time.time()
if path.name in self._recent and now - self._recent[path.name] < self.cooldown:
return
self._recent[path.name] = now
# Wait for file to finish writing
time.sleep(1)
if not path.exists():
return
folder = RULES.get(path.suffix.lower())
if folder:
dest_dir = self.watch_dir / folder
dest_dir.mkdir(exist_ok=True)
dest = dest_dir / path.name
if not dest.exists():
shutil.move(str(path), str(dest))
print(f" ✓ {path.name} → {folder}/")
def watch(directory: str):
observer = Observer()
observer.schedule(FileOrganizer(directory), directory, recursive=False)
observer.start()
print(f"Watching {directory} for new files... (Ctrl+C to stop)")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
import sys
watch(sys.argv[1] if len(sys.argv) > 1 else str(Path.home() / "Downloads"))
Start this as a background process or system service. Every new file in your Downloads folder gets sorted automatically.
Rename hundreds of files using regex, date stamps, or sequential numbering. Supports preview mode and collision-safe naming.
#!/usr/bin/env python3
"""Bulk rename files with regex, dates, or sequential numbering."""
import re
from pathlib import Path
from datetime import datetime
def rename_regex(directory: str, pattern: str, replacement: str,
dry_run: bool = True, extensions: set = None):
"""Rename files matching a regex pattern.
Examples:
rename_regex("./photos", r"IMG_(\d+)", r"photo_\1")
rename_regex("./docs", r"report_v\d+", "report_final", extensions={".pdf"})
"""
target = Path(directory)
renamed = []
for item in sorted(target.iterdir()):
if not item.is_file():
continue
if extensions and item.suffix.lower() not in extensions:
continue
new_stem = re.sub(pattern, replacement, item.stem)
if new_stem == item.stem:
continue
new_name = new_stem + item.suffix
new_path = item.parent / new_name
# Collision safety
counter = 1
while new_path.exists() and new_path != item:
new_path = item.parent / f"{new_stem}_{counter}{item.suffix}"
counter += 1
if dry_run:
print(f" [DRY] {item.name} → {new_path.name}")
else:
item.rename(new_path)
print(f" ✓ {item.name} → {new_path.name}")
renamed.append((str(item), str(new_path)))
return renamed
def rename_sequential(directory: str, prefix: str = "file",
start: int = 1, pad: int = 3,
extensions: set = None, dry_run: bool = True):
"""Rename files with sequential numbers: photo_001.jpg, photo_002.jpg, ..."""
target = Path(directory)
files = sorted(
[f for f in target.iterdir()
if f.is_file() and (not extensions or f.suffix.lower() in extensions)],
key=lambda f: f.stat().st_mtime
)
renamed = []
for i, item in enumerate(files, start=start):
new_name = f"{prefix}_{str(i).zfill(pad)}{item.suffix}"
new_path = item.parent / new_name
if dry_run:
print(f" [DRY] {item.name} → {new_name}")
else:
item.rename(new_path)
print(f" ✓ {item.name} → {new_name}")
renamed.append((str(item), str(new_path)))
return renamed
def rename_by_date(directory: str, fmt: str = "%Y-%m-%d",
use_exif: bool = False, dry_run: bool = True):
"""Rename files using their modification date (or EXIF date for photos)."""
target = Path(directory)
date_counts = {}
for item in sorted(target.iterdir(), key=lambda f: f.stat().st_mtime):
if not item.is_file():
continue
mtime = datetime.fromtimestamp(item.stat().st_mtime)
date_str = mtime.strftime(fmt)
# Track duplicates per date
key = (date_str, item.suffix)
date_counts[key] = date_counts.get(key, 0) + 1
count = date_counts[key]
suffix = f"_{count}" if count > 1 else ""
new_name = f"{date_str}{suffix}{item.suffix}"
new_path = item.parent / new_name
if dry_run:
print(f" [DRY] {item.name} → {new_name}")
else:
item.rename(new_path)
print(f" ✓ {item.name} → {new_name}")
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage:")
print(" rename.py regex <dir> <pattern> <replacement> [--apply]")
print(" rename.py seq <dir> <prefix> [--apply]")
print(" rename.py date <dir> [--apply]")
sys.exit(1)
mode = sys.argv[1]
dry = "--apply" not in sys.argv
if mode == "regex" and len(sys.argv) >= 5:
rename_regex(sys.argv[2], sys.argv[3], sys.argv[4], dry_run=dry)
elif mode == "seq" and len(sys.argv) >= 4:
rename_sequential(sys.argv[2], prefix=sys.argv[3], dry_run=dry)
elif mode == "date" and len(sys.argv) >= 3:
rename_by_date(sys.argv[2], dry_run=dry)
Three modes in one script. Always previews first — pass --apply to commit changes. See our automation scripts article for more utility patterns.
A backup script that only copies changed files (hash-based deduplication), compresses archives, and rotates old backups.
#!/usr/bin/env python3
"""Incremental backup with hash tracking, compression, and rotation."""
import hashlib
import json
import shutil
import tarfile
from pathlib import Path
from datetime import datetime
def file_hash(path: Path, algorithm: str = "sha256") -> str:
h = hashlib.new(algorithm)
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
class IncrementalBackup:
def __init__(self, source: str, backup_dir: str, max_backups: int = 10):
self.source = Path(source)
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(parents=True, exist_ok=True)
self.state_file = self.backup_dir / ".backup_state.json"
self.max_backups = max_backups
self.state = self._load_state()
def _load_state(self) -> dict:
if self.state_file.exists():
return json.loads(self.state_file.read_text())
return {"files": {}, "backups": []}
def _save_state(self):
self.state_file.write_text(json.dumps(self.state, indent=2))
def run(self, compress: bool = True, exclude: set = None):
exclude = exclude or {".git", "__pycache__", "node_modules", ".venv"}
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Find changed files
changed = []
current_files = {}
for item in self.source.rglob("*"):
if item.is_dir():
continue
if any(part in exclude for part in item.parts):
continue
rel = str(item.relative_to(self.source))
h = file_hash(item)
current_files[rel] = h
if self.state["files"].get(rel) != h:
changed.append(item)
# Detect deleted files
deleted = set(self.state["files"]) - set(current_files)
if not changed and not deleted:
print("No changes detected. Skipping backup.")
return None
print(f"Changes: {len(changed)} modified/new, {len(deleted)} deleted")
# Create backup
backup_name = f"backup_{timestamp}"
if compress:
archive_path = self.backup_dir / f"{backup_name}.tar.gz"
with tarfile.open(archive_path, "w:gz") as tar:
for item in changed:
arcname = str(item.relative_to(self.source))
tar.add(item, arcname=arcname)
size_mb = archive_path.stat().st_size / (1024 * 1024)
print(f"Created: {archive_path.name} ({size_mb:.1f} MB, {len(changed)} files)")
else:
backup_path = self.backup_dir / backup_name
backup_path.mkdir()
for item in changed:
rel = item.relative_to(self.source)
dest = backup_path / rel
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(item, dest)
print(f"Created: {backup_path.name} ({len(changed)} files)")
# Update state
self.state["files"] = current_files
self.state["backups"].append({
"name": backup_name,
"timestamp": timestamp,
"files_changed": len(changed),
"files_deleted": len(deleted),
"compressed": compress,
})
self._save_state()
# Rotate old backups
self._rotate()
return backup_name
def _rotate(self):
backups = self.state["backups"]
while len(backups) > self.max_backups:
old = backups.pop(0)
old_path = self.backup_dir / old["name"]
for ext in [".tar.gz", ""]:
p = Path(str(old_path) + ext)
if p.exists():
if p.is_dir():
shutil.rmtree(p)
else:
p.unlink()
print(f" Rotated: {p.name}")
self._save_state()
def list_backups(self):
for b in self.state["backups"]:
print(f" {b['name']} — {b['files_changed']} changed, {b['files_deleted']} deleted")
if __name__ == "__main__":
import sys
if len(sys.argv) < 3:
print("Usage: backup.py <source_dir> <backup_dir> [--no-compress] [--list]")
sys.exit(1)
backup = IncrementalBackup(sys.argv[1], sys.argv[2])
if "--list" in sys.argv:
backup.list_backups()
else:
backup.run(compress="--no-compress" not in sys.argv)
Pair this with cron or APScheduler for fully automated daily backups. The hash-based approach means only actual changes get backed up — no wasted space.
Find duplicate files across directories using content hashing. Groups duplicates by hash and reports wasted space.
#!/usr/bin/env python3
"""Find duplicate files by content hash. Reports groups and wasted space."""
import hashlib
import sys
from pathlib import Path
from collections import defaultdict
def file_hash(path: Path, quick: bool = False) -> str:
"""Hash file contents. Quick mode hashes first/last 4KB + size for speed."""
if quick:
size = path.stat().st_size
h = hashlib.md5(str(size).encode())
with open(path, "rb") as f:
h.update(f.read(4096))
if size > 8192:
f.seek(-4096, 2)
h.update(f.read(4096))
return h.hexdigest()
else:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def find_duplicates(directories: list, min_size: int = 1024,
exclude: set = None, quick: bool = True) -> dict:
"""Scan directories for duplicate files.
Two-pass approach for speed:
1. Group by file size (fast filter)
2. Hash only files with matching sizes
"""
exclude = exclude or {".git", "__pycache__", "node_modules", ".venv"}
# Pass 1: Group by size
size_groups = defaultdict(list)
total_scanned = 0
for directory in directories:
for item in Path(directory).rglob("*"):
if not item.is_file() or item.is_symlink():
continue
if any(part in exclude for part in item.parts):
continue
size = item.stat().st_size
if size >= min_size:
size_groups[size].append(item)
total_scanned += 1
print(f"Scanned {total_scanned} files")
# Pass 2: Hash files with duplicate sizes
candidates = {s: files for s, files in size_groups.items() if len(files) > 1}
print(f"Size matches: {sum(len(f) for f in candidates.values())} files in {len(candidates)} groups")
hash_groups = defaultdict(list)
for size, files in candidates.items():
for f in files:
try:
h = file_hash(f, quick=quick)
# If quick mode found matches, verify with full hash
hash_groups[h].append(f)
except (PermissionError, OSError):
continue
# Filter to actual duplicates
duplicates = {h: files for h, files in hash_groups.items() if len(files) > 1}
# If quick mode, verify with full hash
if quick and duplicates:
verified = defaultdict(list)
for h, files in duplicates.items():
for f in files:
try:
full_h = file_hash(f, quick=False)
verified[full_h].append(f)
except (PermissionError, OSError):
continue
duplicates = {h: files for h, files in verified.items() if len(files) > 1}
return duplicates
def report(duplicates: dict):
"""Print a human-readable duplicate report."""
if not duplicates:
print("\n✅ No duplicates found!")
return
total_wasted = 0
for i, (h, files) in enumerate(duplicates.items(), 1):
size = files[0].stat().st_size
wasted = size * (len(files) - 1)
total_wasted += wasted
print(f"\n--- Group {i} ({len(files)} copies, {size:,} bytes each) ---")
for f in sorted(files, key=lambda p: str(p)):
print(f" {f}")
wasted_mb = total_wasted / (1024 * 1024)
print(f"\n📊 Summary: {len(duplicates)} duplicate groups, {wasted_mb:.1f} MB wasted")
if __name__ == "__main__":
dirs = sys.argv[1:] if len(sys.argv) > 1 else ["."]
dupes = find_duplicates(dirs)
report(dupes)
The two-pass approach (size filter first, then hash) makes it fast even on large directories. Quick mode adds another speedup by only hashing file boundaries before doing a full SHA-256 verification.
These scripts are from the AI Agent Toolkit — 50+ production-ready Python scripts covering file management, web scraping, API integration, data processing, DevOps, and more. Plus 30+ curated AI prompts.
Every script is standalone, documented, and ready to use.
Get the Toolkit — $19Sync files between two directories with conflict detection. Useful for keeping project folders, config directories, or media libraries in sync across drives.
#!/usr/bin/env python3
"""Two-way file sync with conflict detection and resolution."""
import hashlib
import json
import shutil
from pathlib import Path
from datetime import datetime
def file_hash(path: Path) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
class FileSync:
def __init__(self, dir_a: str, dir_b: str):
self.dir_a = Path(dir_a)
self.dir_b = Path(dir_b)
self.state_file = self.dir_a / ".sync_state.json"
self.state = self._load_state()
def _load_state(self) -> dict:
if self.state_file.exists():
return json.loads(self.state_file.read_text())
return {"files": {}}
def _save_state(self):
self.state_file.write_text(json.dumps(self.state, indent=2))
def _scan(self, root: Path, exclude: set) -> dict:
files = {}
for item in root.rglob("*"):
if item.is_dir() or item.name.startswith(".sync"):
continue
if any(part in exclude for part in item.parts):
continue
rel = str(item.relative_to(root))
files[rel] = {
"hash": file_hash(item),
"mtime": item.stat().st_mtime,
"size": item.stat().st_size,
}
return files
def sync(self, dry_run: bool = True, exclude: set = None,
conflict: str = "newer"):
"""Sync two directories.
conflict: "newer" (keep newer mtime), "a" (prefer dir_a), "b" (prefer dir_b)
"""
exclude = exclude or {".git", "__pycache__", "node_modules"}
files_a = self._scan(self.dir_a, exclude)
files_b = self._scan(self.dir_b, exclude)
prev = self.state["files"]
all_files = set(files_a) | set(files_b) | set(prev)
actions = []
for rel in sorted(all_files):
in_a = rel in files_a
in_b = rel in files_b
in_prev = rel in prev
if in_a and not in_b:
if in_prev:
# Was synced before, deleted from B → delete from A
actions.append(("delete_a", rel, "deleted in B"))
else:
# New in A → copy to B
actions.append(("copy_a_to_b", rel, "new in A"))
elif not in_a and in_b:
if in_prev:
actions.append(("delete_b", rel, "deleted in A"))
else:
actions.append(("copy_b_to_a", rel, "new in B"))
elif in_a and in_b:
if files_a[rel]["hash"] != files_b[rel]["hash"]:
# Conflict: both modified
if conflict == "newer":
if files_a[rel]["mtime"] >= files_b[rel]["mtime"]:
actions.append(("copy_a_to_b", rel, "newer in A"))
else:
actions.append(("copy_b_to_a", rel, "newer in B"))
elif conflict == "a":
actions.append(("copy_a_to_b", rel, "prefer A"))
else:
actions.append(("copy_b_to_a", rel, "prefer B"))
if not actions:
print("Directories are in sync.")
return
for action, rel, reason in actions:
if action == "copy_a_to_b":
src, dst = self.dir_a / rel, self.dir_b / rel
elif action == "copy_b_to_a":
src, dst = self.dir_b / rel, self.dir_a / rel
elif action.startswith("delete"):
target = self.dir_a / rel if "a" in action else self.dir_b / rel
tag = "[DRY] " if dry_run else ""
print(f" {tag}🗑 {rel} ({reason})")
if not dry_run and target.exists():
target.unlink()
continue
else:
continue
tag = "[DRY] " if dry_run else ""
print(f" {tag}→ {rel} ({reason})")
if not dry_run:
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
if not dry_run:
# Update state with current snapshot
merged = {}
for rel in set(files_a) | set(files_b):
a_info = files_a.get(rel)
b_info = files_b.get(rel)
merged[rel] = (a_info or b_info)["hash"]
self.state["files"] = merged
self._save_state()
print(f"\n{'[DRY RUN] ' if dry_run else ''}{len(actions)} actions")
if __name__ == "__main__":
import sys
if len(sys.argv) < 3:
print("Usage: sync.py <dir_a> <dir_b> [--apply] [--prefer a|b|newer]")
sys.exit(1)
prefer = "newer"
if "--prefer" in sys.argv:
idx = sys.argv.index("--prefer")
prefer = sys.argv[idx + 1] if idx + 1 < len(sys.argv) else "newer"
syncer = FileSync(sys.argv[1], sys.argv[2])
syncer.sync(dry_run="--apply" not in sys.argv, conflict=prefer)
The state file tracks what was synced last time, so it correctly handles deletions (a file removed from one side gets removed from the other, not re-copied back). See the database operations guide for more on state management patterns.
Here's a practical workflow combining these tools:
#!/usr/bin/env python3
"""Weekly file maintenance: organize, deduplicate, backup."""
from pathlib import Path
# Import the scripts above (or inline them)
# from organize import organize
# from duplicates import find_duplicates, report
# from backup import IncrementalBackup
def weekly_maintenance():
home = Path.home()
# 1. Organize downloads
print("=== Step 1: Organize Downloads ===")
organize(str(home / "Downloads"))
# 2. Find duplicates in common dirs
print("\n=== Step 2: Find Duplicates ===")
dupes = find_duplicates([
str(home / "Documents"),
str(home / "Downloads"),
])
report(dupes)
# 3. Backup important dirs
print("\n=== Step 3: Incremental Backup ===")
for source in ["Documents", "Projects", "Photos"]:
src = home / source
if src.exists():
backup = IncrementalBackup(str(src), str(home / "Backups" / source))
backup.run()
print("\n✅ Weekly maintenance complete!")
if __name__ == "__main__":
weekly_maintenance()
The AI Agent Toolkit includes these scripts plus 45+ more — web scraping, API automation, data processing, CLI tools, email automation, and more. All production-ready, all standalone.
Get the Full Toolkit — $19