Email remains the backbone of business communication. Every day, teams manually send reports, parse invoices, route support tickets, and follow up on leads. All of this can be automated with Python's built-in libraries — no paid API needed.
Common use cases for email automation:
Python gives you everything out of the box: smtplib for sending, imaplib for reading, and email for parsing. No external dependencies required for the basics.
Let's start with the fundamentals — sending a plain text email via SMTP:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os
def send_email(to_addr: str, subject: str, body: str) -> bool:
"""Send a plain text email via SMTP."""
smtp_host = os.environ["SMTP_HOST"] # e.g., smtp.gmail.com
smtp_port = int(os.environ.get("SMTP_PORT", "587"))
smtp_user = os.environ["SMTP_USER"]
smtp_pass = os.environ["SMTP_PASS"]
from_addr = os.environ.get("FROM_ADDR", smtp_user)
msg = MIMEMultipart()
msg["From"] = from_addr
msg["To"] = to_addr
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
try:
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.ehlo()
server.starttls()
server.ehlo()
server.login(smtp_user, smtp_pass)
server.sendmail(from_addr, to_addr, msg.as_string())
print(f"Email sent to {to_addr}")
return True
except smtplib.SMTPException as e:
print(f"Failed to send email: {e}")
return False
# Usage
send_email(
"colleague@example.com",
"Daily Report — March 25",
"All systems operational. No incidents in the last 24 hours."
)
def send_bulk_email(recipients: list[str], subject: str, body: str,
delay: float = 1.0) -> dict:
"""Send to multiple recipients with rate limiting."""
import time
results = {"sent": [], "failed": []}
for addr in recipients:
success = send_email(addr, subject, body)
if success:
results["sent"].append(addr)
else:
results["failed"].append(addr)
time.sleep(delay) # Rate limiting — be respectful
print(f"Sent: {len(results['sent'])}, Failed: {len(results['failed'])}")
return results
Plain text is fine for alerts, but reports and newsletters need HTML formatting. Here's how to send rich HTML emails with file attachments:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from pathlib import Path
import os
def send_html_email(to_addr: str, subject: str, html_body: str,
text_body: str = "", attachments: list[str] = None) -> bool:
"""Send an HTML email with optional attachments."""
smtp_host = os.environ["SMTP_HOST"]
smtp_port = int(os.environ.get("SMTP_PORT", "587"))
smtp_user = os.environ["SMTP_USER"]
smtp_pass = os.environ["SMTP_PASS"]
from_addr = os.environ.get("FROM_ADDR", smtp_user)
msg = MIMEMultipart("alternative")
msg["From"] = from_addr
msg["To"] = to_addr
msg["Subject"] = subject
# Plain text fallback
if text_body:
msg.attach(MIMEText(text_body, "plain"))
# HTML content
msg.attach(MIMEText(html_body, "html"))
# Attachments
for filepath in (attachments or []):
path = Path(filepath)
if not path.exists():
print(f"Warning: attachment not found: {filepath}")
continue
part = MIMEBase("application", "octet-stream")
part.set_payload(path.read_bytes())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={path.name}"
)
msg.attach(part)
try:
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.ehlo()
server.starttls()
server.login(smtp_user, smtp_pass)
server.sendmail(from_addr, to_addr, msg.as_string())
return True
except smtplib.SMTPException as e:
print(f"Failed to send: {e}")
return False
# Usage: send a report with a CSV attachment
html = """
<h2>Weekly Sales Report</h2>
<p>Total revenue: <strong>$12,450</strong></p>
<table border="1" cellpadding="8">
<tr><th>Product</th><th>Units</th><th>Revenue</th></tr>
<tr><td>Widget A</td><td>340</td><td>$6,800</td></tr>
<tr><td>Widget B</td><td>225</td><td>$5,650</td></tr>
</table>
"""
send_html_email(
"boss@example.com",
"Weekly Sales Report — Week 12",
html,
text_body="Weekly sales: $12,450 total. See attachment for details.",
attachments=["reports/sales_week12.csv"]
)
Reading emails is where automation gets really powerful. With imaplib, you can search, filter, and process emails programmatically:
import imaplib
import email
from email.header import decode_header
from dataclasses import dataclass
from datetime import datetime
import os
@dataclass
class EmailMessage:
uid: str
sender: str
subject: str
date: str
body_text: str
body_html: str
attachments: list[dict]
def connect_imap() -> imaplib.IMAP4_SSL:
"""Connect to IMAP server and select inbox."""
host = os.environ["IMAP_HOST"] # e.g., imap.gmail.com
user = os.environ["IMAP_USER"]
passwd = os.environ["IMAP_PASS"]
mail = imaplib.IMAP4_SSL(host)
mail.login(user, passwd)
mail.select("INBOX")
return mail
def decode_subject(subject_header) -> str:
"""Decode email subject from MIME encoding."""
if subject_header is None:
return "(no subject)"
parts = decode_header(subject_header)
decoded = []
for content, encoding in parts:
if isinstance(content, bytes):
decoded.append(content.decode(encoding or "utf-8", errors="replace"))
else:
decoded.append(content)
return " ".join(decoded)
def search_emails(mail: imaplib.IMAP4_SSL,
criteria: str = "ALL",
limit: int = 10) -> list[str]:
"""Search emails and return UIDs."""
status, data = mail.uid("search", None, criteria)
if status != "OK":
return []
uids = data[0].split()
return [uid.decode() for uid in uids[-limit:]] # Latest N
def fetch_email(mail: imaplib.IMAP4_SSL, uid: str) -> EmailMessage:
"""Fetch and parse a single email by UID."""
status, data = mail.uid("fetch", uid, "(RFC822)")
if status != "OK":
raise ValueError(f"Failed to fetch UID {uid}")
raw = data[0][1]
msg = email.message_from_bytes(raw)
# Extract body and attachments
body_text = ""
body_html = ""
attachments = []
for part in msg.walk():
content_type = part.get_content_type()
disposition = str(part.get("Content-Disposition", ""))
if "attachment" in disposition:
filename = part.get_filename() or "unnamed"
attachments.append({
"filename": filename,
"content_type": content_type,
"size": len(part.get_payload(decode=True) or b""),
"data": part.get_payload(decode=True)
})
elif content_type == "text/plain":
payload = part.get_payload(decode=True)
if payload:
body_text = payload.decode(
part.get_content_charset() or "utf-8", errors="replace"
)
elif content_type == "text/html":
payload = part.get_payload(decode=True)
if payload:
body_html = payload.decode(
part.get_content_charset() or "utf-8", errors="replace"
)
return EmailMessage(
uid=uid,
sender=msg.get("From", ""),
subject=decode_subject(msg.get("Subject")),
date=msg.get("Date", ""),
body_text=body_text,
body_html=body_html,
attachments=attachments,
)
# Usage: read the 5 latest unread emails
mail = connect_imap()
uids = search_emails(mail, "UNSEEN", limit=5)
for uid in uids:
em = fetch_email(mail, uid)
print(f"From: {em.sender}")
print(f"Subject: {em.subject}")
print(f"Attachments: {len(em.attachments)}")
print(f"Preview: {em.body_text[:200]}")
print("---")
mail.logout()
# Unread messages
search_emails(mail, "UNSEEN")
# From a specific sender
search_emails(mail, 'FROM "boss@example.com"')
# Messages with a keyword in subject
search_emails(mail, 'SUBJECT "invoice"')
# Since a specific date (DD-Mon-YYYY)
search_emails(mail, 'SINCE "20-Mar-2026"')
# Combine criteria (AND is implicit)
search_emails(mail, 'UNSEEN FROM "support@stripe.com" SINCE "01-Mar-2026"')
# OR logic
search_emails(mail, 'OR FROM "alice@co.com" FROM "bob@co.com"')
Once you can read emails, the next step is extracting structured data. Here's a parser that handles common patterns:
import re
from pathlib import Path
class EmailParser:
"""Extract structured data from email content."""
@staticmethod
def extract_amounts(text: str) -> list[float]:
"""Find dollar amounts in text."""
pattern = r'\$[\d,]+\.?\d*'
matches = re.findall(pattern, text)
return [float(m.replace('$', '').replace(',', '')) for m in matches]
@staticmethod
def extract_urls(text: str) -> list[str]:
"""Find URLs in text."""
pattern = r'https?://[^\s<>"\')\]]+'
return re.findall(pattern, text)
@staticmethod
def extract_emails(text: str) -> list[str]:
"""Find email addresses in text."""
pattern = r'[\w.+-]+@[\w-]+\.[\w.-]+'
return re.findall(pattern, text)
@staticmethod
def extract_dates(text: str) -> list[str]:
"""Find common date formats."""
patterns = [
r'\d{4}-\d{2}-\d{2}', # 2026-03-25
r'\d{1,2}/\d{1,2}/\d{2,4}', # 3/25/2026
r'\w+ \d{1,2},? \d{4}', # March 25, 2026
]
dates = []
for p in patterns:
dates.extend(re.findall(p, text))
return dates
@staticmethod
def save_attachments(email_msg: 'EmailMessage',
output_dir: str = "attachments") -> list[str]:
"""Save all attachments to disk."""
out = Path(output_dir)
out.mkdir(parents=True, exist_ok=True)
saved = []
for att in email_msg.attachments:
filepath = out / att["filename"]
# Avoid overwrites
counter = 1
while filepath.exists():
stem = Path(att["filename"]).stem
suffix = Path(att["filename"]).suffix
filepath = out / f"{stem}_{counter}{suffix}"
counter += 1
filepath.write_bytes(att["data"])
saved.append(str(filepath))
print(f"Saved: {filepath} ({att['size']} bytes)")
return saved
# Usage
parser = EmailParser()
mail = connect_imap()
uids = search_emails(mail, 'SUBJECT "invoice"', limit=5)
for uid in uids:
em = fetch_email(mail, uid)
# Extract financial data
amounts = parser.extract_amounts(em.body_text)
if amounts:
print(f"Invoice from {em.sender}: amounts = {amounts}")
# Save attachments (PDFs, CSVs, etc.)
if em.attachments:
files = parser.save_attachments(em, "invoices")
print(f"Saved {len(files)} attachments")
mail.logout()
Now let's combine everything into a real processing pipeline — a system that monitors your inbox, matches rules, and takes actions automatically:
import imaplib
import time
import json
import logging
from dataclasses import dataclass, field
from pathlib import Path
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
@dataclass
class Rule:
name: str
match_from: str = "" # Regex for sender
match_subject: str = "" # Regex for subject
match_body: str = "" # Regex for body
actions: list[str] = field(default_factory=list)
# Actions: "save_attachments", "forward:addr@example.com",
# "label:important", "log", "webhook:https://..."
class EmailPipeline:
"""Rule-based email processing pipeline."""
def __init__(self, rules: list[Rule], state_file: str = "pipeline_state.json"):
self.rules = rules
self.state_file = Path(state_file)
self.state = self._load_state()
def _load_state(self) -> dict:
if self.state_file.exists():
return json.loads(self.state_file.read_text())
return {"last_uid": "0", "processed": 0}
def _save_state(self):
self.state_file.write_text(json.dumps(self.state, indent=2))
def match_rule(self, email_msg: EmailMessage, rule: Rule) -> bool:
"""Check if an email matches a rule."""
import re
if rule.match_from:
if not re.search(rule.match_from, email_msg.sender, re.I):
return False
if rule.match_subject:
if not re.search(rule.match_subject, email_msg.subject, re.I):
return False
if rule.match_body:
if not re.search(rule.match_body, email_msg.body_text, re.I):
return False
return True
def execute_actions(self, email_msg: EmailMessage, rule: Rule):
"""Execute all actions for a matched rule."""
for action in rule.actions:
if action == "save_attachments":
parser = EmailParser()
parser.save_attachments(email_msg, f"attachments/{rule.name}")
elif action == "log":
logger.info(
f"[{rule.name}] {email_msg.sender} — {email_msg.subject}"
)
elif action.startswith("forward:"):
to_addr = action.split(":", 1)[1]
send_email(
to_addr,
f"FWD: {email_msg.subject}",
f"Forwarded from {email_msg.sender}\n\n{email_msg.body_text}"
)
logger.info(f"Forwarded to {to_addr}")
elif action.startswith("webhook:"):
import urllib.request
url = action.split(":", 1)[1]
payload = json.dumps({
"from": email_msg.sender,
"subject": email_msg.subject,
"date": email_msg.date,
"preview": email_msg.body_text[:500],
"rule": rule.name,
}).encode()
req = urllib.request.Request(
url, data=payload,
headers={"Content-Type": "application/json"}
)
urllib.request.urlopen(req, timeout=10)
logger.info(f"Webhook sent to {url}")
def process_new_emails(self):
"""Check for new emails and process them against rules."""
mail = connect_imap()
try:
# Search for emails newer than our last processed UID
uids = search_emails(mail, "UNSEEN", limit=50)
new_count = 0
for uid in uids:
if int(uid) <= int(self.state["last_uid"]):
continue
em = fetch_email(mail, uid)
new_count += 1
# Check each rule
matched = False
for rule in self.rules:
if self.match_rule(em, rule):
logger.info(f"Rule '{rule.name}' matched: {em.subject}")
self.execute_actions(em, rule)
matched = True
if not matched:
logger.debug(f"No rules matched: {em.subject}")
self.state["last_uid"] = uid
self.state["processed"] += 1
self._save_state()
logger.info(f"Processed {new_count} new emails "
f"(total: {self.state['processed']})")
finally:
mail.logout()
def run_forever(self, interval: int = 60):
"""Poll for new emails at a regular interval."""
logger.info(f"Starting email pipeline (checking every {interval}s)")
while True:
try:
self.process_new_emails()
except Exception as e:
logger.error(f"Pipeline error: {e}", exc_info=True)
time.sleep(interval)
# Setup rules
rules = [
Rule(
name="invoices",
match_subject=r"invoice|receipt|payment",
actions=["save_attachments", "log"]
),
Rule(
name="urgent-alerts",
match_from=r"alerts@myservice\.com",
match_subject=r"(critical|down|error)",
actions=["forward:oncall@team.com", "webhook:https://hooks.slack.com/xxx"]
),
Rule(
name="support-tickets",
match_from=r"support@|help@",
actions=["log", "webhook:https://api.crm.com/tickets"]
),
]
# Run
pipeline = EmailPipeline(rules)
pipeline.run_forever(interval=120) # Check every 2 minutes
For sending personalized emails at scale, templates are essential. Here's how to use Jinja2 (one pip install jinja2 away):
from jinja2 import Template
# Define a template
WELCOME_TEMPLATE = """
<html>
<body style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto;">
<h2 style="color: #333;">Welcome, {{ name }}!</h2>
<p>Thanks for signing up for {{ product }}. Here's what's next:</p>
<ol>
{% for step in onboarding_steps %}
<li>{{ step }}</li>
{% endfor %}
</ol>
{% if trial_days %}
<p>Your free trial expires in <strong>{{ trial_days }} days</strong>.</p>
{% endif %}
<p>Questions? Just reply to this email.</p>
<p>— The {{ product }} Team</p>
</body>
</html>
"""
def send_welcome_email(user: dict):
"""Send a personalized welcome email."""
template = Template(WELCOME_TEMPLATE)
html = template.render(
name=user["name"],
product="AI Toolkit Pro",
onboarding_steps=[
"Check your dashboard",
"Import your first dataset",
"Run your first automation",
"Invite your team members",
],
trial_days=user.get("trial_days", 14),
)
send_html_email(
user["email"],
f"Welcome to AI Toolkit Pro, {user['name']}!",
html,
text_body=f"Welcome, {user['name']}! Thanks for signing up."
)
# Send to a list of new users
new_users = [
{"name": "Alice", "email": "alice@example.com", "trial_days": 14},
{"name": "Bob", "email": "bob@example.com", "trial_days": 30},
]
for user in new_users:
send_welcome_email(user)
import time; time.sleep(2) # Rate limit
Email automation in production needs to handle failures gracefully. Here are the patterns that matter:
import time
import smtplib
def send_with_retry(to_addr: str, subject: str, body: str,
max_retries: int = 3) -> bool:
"""Send email with exponential backoff on failure."""
for attempt in range(max_retries):
try:
return send_email(to_addr, subject, body)
except smtplib.SMTPServerDisconnected:
wait = 2 ** attempt # 1s, 2s, 4s
print(f"Connection lost, retrying in {wait}s (attempt {attempt + 1})")
time.sleep(wait)
except smtplib.SMTPRecipientsRefused as e:
print(f"Recipient refused: {e}")
return False # Don't retry — address is invalid
except smtplib.SMTPException as e:
wait = 2 ** attempt
print(f"SMTP error: {e}, retrying in {wait}s")
time.sleep(wait)
print(f"Failed after {max_retries} attempts")
return False
class SMTPPool:
"""Reuse SMTP connection for batch sending."""
def __init__(self):
self.connection = None
def _connect(self):
import os
self.connection = smtplib.SMTP(
os.environ["SMTP_HOST"],
int(os.environ.get("SMTP_PORT", "587"))
)
self.connection.ehlo()
self.connection.starttls()
self.connection.login(
os.environ["SMTP_USER"],
os.environ["SMTP_PASS"]
)
def send(self, from_addr: str, to_addr: str, msg: str) -> bool:
"""Send using pooled connection, reconnect if needed."""
for attempt in range(2):
try:
if self.connection is None:
self._connect()
self.connection.sendmail(from_addr, to_addr, msg)
return True
except (smtplib.SMTPServerDisconnected, smtplib.SMTPException):
self.connection = None # Force reconnect
if attempt == 0:
continue
raise
def close(self):
if self.connection:
try:
self.connection.quit()
except Exception:
pass
self.connection = None
import time
from collections import deque
class RateLimiter:
"""Sliding window rate limiter for email sending."""
def __init__(self, max_per_minute: int = 30, max_per_hour: int = 500):
self.max_per_minute = max_per_minute
self.max_per_hour = max_per_hour
self.minute_window = deque()
self.hour_window = deque()
def wait_if_needed(self):
"""Block until we're within rate limits."""
now = time.time()
# Clean old entries
while self.minute_window and now - self.minute_window[0] > 60:
self.minute_window.popleft()
while self.hour_window and now - self.hour_window[0] > 3600:
self.hour_window.popleft()
# Wait if at limit
if len(self.minute_window) >= self.max_per_minute:
sleep_time = 60 - (now - self.minute_window[0])
if sleep_time > 0:
print(f"Rate limit: waiting {sleep_time:.1f}s (minute)")
time.sleep(sleep_time)
if len(self.hour_window) >= self.max_per_hour:
sleep_time = 3600 - (now - self.hour_window[0])
if sleep_time > 0:
print(f"Rate limit: waiting {sleep_time:.1f}s (hour)")
time.sleep(sleep_time)
self.minute_window.append(time.time())
self.hour_window.append(time.time())
Email credentials are sensitive. Follow these rules:
import os
from pathlib import Path
def load_email_config() -> dict:
"""Load email config from environment or .env file."""
# Try environment first
config = {
"smtp_host": os.environ.get("SMTP_HOST"),
"smtp_port": int(os.environ.get("SMTP_PORT", "587")),
"smtp_user": os.environ.get("SMTP_USER"),
"smtp_pass": os.environ.get("SMTP_PASS"),
"imap_host": os.environ.get("IMAP_HOST"),
"imap_user": os.environ.get("IMAP_USER"),
"imap_pass": os.environ.get("IMAP_PASS"),
}
# Fall back to .env file
env_file = Path(".env")
if env_file.exists() and not config["smtp_host"]:
for line in env_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, value = line.split("=", 1)
os.environ[key.strip()] = value.strip()
# Reload
return load_email_config()
# Validate required fields
required = ["smtp_host", "smtp_user", "smtp_pass"]
missing = [k for k in required if not config.get(k)]
if missing:
raise ValueError(f"Missing email config: {', '.join(missing)}")
return config
Let's put it all together — a complete script that monitors your inbox for invoices, extracts amounts, saves PDFs, and creates a summary report:
#!/usr/bin/env python3
"""
Invoice Processor — monitors inbox, extracts invoice data, generates reports.
Requires: pip install jinja2 (optional, for HTML reports)
"""
import imaplib
import email
import re
import json
import csv
import logging
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, field, asdict
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("invoice_processor.log"),
]
)
logger = logging.getLogger(__name__)
@dataclass
class Invoice:
sender: str
subject: str
date: str
amounts: list[float] = field(default_factory=list)
total: float = 0.0
attachments: list[str] = field(default_factory=list)
uid: str = ""
class InvoiceProcessor:
def __init__(self, output_dir: str = "invoices"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.invoices: list[Invoice] = []
self.state_file = self.output_dir / "state.json"
self.state = self._load_state()
def _load_state(self) -> dict:
if self.state_file.exists():
return json.loads(self.state_file.read_text())
return {"processed_uids": [], "last_run": None}
def _save_state(self):
self.state["last_run"] = datetime.now().isoformat()
self.state_file.write_text(json.dumps(self.state, indent=2))
def scan_inbox(self):
"""Scan for invoice emails and process them."""
import os
mail = imaplib.IMAP4_SSL(os.environ["IMAP_HOST"])
mail.login(os.environ["IMAP_USER"], os.environ["IMAP_PASS"])
mail.select("INBOX")
# Search for invoice-related emails
criteria = 'OR SUBJECT "invoice" OR SUBJECT "receipt" SUBJECT "payment"'
status, data = mail.uid("search", None, criteria)
if status != "OK":
logger.error("IMAP search failed")
return
uids = data[0].split()
new_count = 0
for uid_bytes in uids:
uid = uid_bytes.decode()
if uid in self.state["processed_uids"]:
continue
try:
invoice = self._process_email(mail, uid)
if invoice:
self.invoices.append(invoice)
new_count += 1
self.state["processed_uids"].append(uid)
except Exception as e:
logger.error(f"Error processing UID {uid}: {e}")
mail.logout()
self._save_state()
logger.info(f"Processed {new_count} new invoice emails")
def _process_email(self, mail, uid: str) -> Invoice | None:
"""Extract invoice data from a single email."""
status, data = mail.uid("fetch", uid, "(RFC822)")
if status != "OK":
return None
msg = email.message_from_bytes(data[0][1])
body = ""
saved_files = []
for part in msg.walk():
ctype = part.get_content_type()
disposition = str(part.get("Content-Disposition", ""))
if "attachment" in disposition:
filename = part.get_filename() or f"attachment_{uid}"
filepath = self.output_dir / filename
counter = 1
while filepath.exists():
stem = Path(filename).stem
suffix = Path(filename).suffix
filepath = self.output_dir / f"{stem}_{counter}{suffix}"
counter += 1
payload = part.get_payload(decode=True)
if payload:
filepath.write_bytes(payload)
saved_files.append(str(filepath))
logger.info(f"Saved attachment: {filepath}")
elif ctype == "text/plain" and not body:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace")
# Extract amounts
amounts = [
float(m.replace(",", ""))
for m in re.findall(r'\$?([\d,]+\.\d{2})', body)
]
if not amounts and not saved_files:
return None # Not a real invoice
invoice = Invoice(
sender=msg.get("From", ""),
subject=msg.get("Subject", ""),
date=msg.get("Date", ""),
amounts=amounts,
total=max(amounts) if amounts else 0.0,
attachments=saved_files,
uid=uid,
)
logger.info(
f"Invoice: {invoice.sender} | ${invoice.total:.2f} | "
f"{len(saved_files)} files"
)
return invoice
def generate_report(self) -> str:
"""Generate a CSV summary of all processed invoices."""
report_path = self.output_dir / f"report_{datetime.now():%Y%m%d}.csv"
with open(report_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["Date", "Sender", "Subject", "Total", "Files"])
for inv in sorted(self.invoices, key=lambda x: x.date):
writer.writerow([
inv.date, inv.sender, inv.subject,
f"${inv.total:.2f}", len(inv.attachments)
])
total = sum(inv.total for inv in self.invoices)
logger.info(
f"Report: {len(self.invoices)} invoices, "
f"total: ${total:.2f} → {report_path}"
)
return str(report_path)
if __name__ == "__main__":
processor = InvoiceProcessor()
processor.scan_inbox()
if processor.invoices:
report = processor.generate_report()
print(f"\nReport saved: {report}")
print(f"Total invoices: {len(processor.invoices)}")
print(f"Total amount: ${sum(i.total for i in processor.invoices):.2f}")
else:
print("No new invoices found.")
The AI Toolkit includes email automation, web scraping, API integrations, data pipelines, and more — all production-ready with error handling and documentation.
Get the AI Toolkit — $19