Automate Email with Python — Send, Read & Process Like a Pro

By Kristy · March 2026 · 15 min read

Table of Contents

Why Automate Email?

Email remains the backbone of business communication. Every day, teams manually send reports, parse invoices, route support tickets, and follow up on leads. All of this can be automated with Python's built-in libraries — no paid API needed.

Common use cases for email automation:

Python gives you everything out of the box: smtplib for sending, imaplib for reading, and email for parsing. No external dependencies required for the basics.

Sending Emails with SMTP

Let's start with the fundamentals — sending a plain text email via SMTP:

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os


def send_email(to_addr: str, subject: str, body: str) -> bool:
    """Send a plain text email via SMTP."""
    smtp_host = os.environ["SMTP_HOST"]      # e.g., smtp.gmail.com
    smtp_port = int(os.environ.get("SMTP_PORT", "587"))
    smtp_user = os.environ["SMTP_USER"]
    smtp_pass = os.environ["SMTP_PASS"]
    from_addr = os.environ.get("FROM_ADDR", smtp_user)

    msg = MIMEMultipart()
    msg["From"] = from_addr
    msg["To"] = to_addr
    msg["Subject"] = subject
    msg.attach(MIMEText(body, "plain"))

    try:
        with smtplib.SMTP(smtp_host, smtp_port) as server:
            server.ehlo()
            server.starttls()
            server.ehlo()
            server.login(smtp_user, smtp_pass)
            server.sendmail(from_addr, to_addr, msg.as_string())
        print(f"Email sent to {to_addr}")
        return True
    except smtplib.SMTPException as e:
        print(f"Failed to send email: {e}")
        return False


# Usage
send_email(
    "colleague@example.com",
    "Daily Report — March 25",
    "All systems operational. No incidents in the last 24 hours."
)
Tip: Always use starttls() on port 587 (submission) or connect directly to port 465 with SMTP_SSL. Never send credentials over an unencrypted connection.

Sending to Multiple Recipients

def send_bulk_email(recipients: list[str], subject: str, body: str,
                    delay: float = 1.0) -> dict:
    """Send to multiple recipients with rate limiting."""
    import time

    results = {"sent": [], "failed": []}

    for addr in recipients:
        success = send_email(addr, subject, body)
        if success:
            results["sent"].append(addr)
        else:
            results["failed"].append(addr)
        time.sleep(delay)  # Rate limiting — be respectful

    print(f"Sent: {len(results['sent'])}, Failed: {len(results['failed'])}")
    return results

Sending HTML Emails with Attachments

Plain text is fine for alerts, but reports and newsletters need HTML formatting. Here's how to send rich HTML emails with file attachments:

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from pathlib import Path
import os


def send_html_email(to_addr: str, subject: str, html_body: str,
                    text_body: str = "", attachments: list[str] = None) -> bool:
    """Send an HTML email with optional attachments."""
    smtp_host = os.environ["SMTP_HOST"]
    smtp_port = int(os.environ.get("SMTP_PORT", "587"))
    smtp_user = os.environ["SMTP_USER"]
    smtp_pass = os.environ["SMTP_PASS"]
    from_addr = os.environ.get("FROM_ADDR", smtp_user)

    msg = MIMEMultipart("alternative")
    msg["From"] = from_addr
    msg["To"] = to_addr
    msg["Subject"] = subject

    # Plain text fallback
    if text_body:
        msg.attach(MIMEText(text_body, "plain"))

    # HTML content
    msg.attach(MIMEText(html_body, "html"))

    # Attachments
    for filepath in (attachments or []):
        path = Path(filepath)
        if not path.exists():
            print(f"Warning: attachment not found: {filepath}")
            continue

        part = MIMEBase("application", "octet-stream")
        part.set_payload(path.read_bytes())
        encoders.encode_base64(part)
        part.add_header(
            "Content-Disposition",
            f"attachment; filename={path.name}"
        )
        msg.attach(part)

    try:
        with smtplib.SMTP(smtp_host, smtp_port) as server:
            server.ehlo()
            server.starttls()
            server.login(smtp_user, smtp_pass)
            server.sendmail(from_addr, to_addr, msg.as_string())
        return True
    except smtplib.SMTPException as e:
        print(f"Failed to send: {e}")
        return False


# Usage: send a report with a CSV attachment
html = """
<h2>Weekly Sales Report</h2>
<p>Total revenue: <strong>$12,450</strong></p>
<table border="1" cellpadding="8">
  <tr><th>Product</th><th>Units</th><th>Revenue</th></tr>
  <tr><td>Widget A</td><td>340</td><td>$6,800</td></tr>
  <tr><td>Widget B</td><td>225</td><td>$5,650</td></tr>
</table>
"""

send_html_email(
    "boss@example.com",
    "Weekly Sales Report — Week 12",
    html,
    text_body="Weekly sales: $12,450 total. See attachment for details.",
    attachments=["reports/sales_week12.csv"]
)

Reading Emails with IMAP

Reading emails is where automation gets really powerful. With imaplib, you can search, filter, and process emails programmatically:

import imaplib
import email
from email.header import decode_header
from dataclasses import dataclass
from datetime import datetime
import os


@dataclass
class EmailMessage:
    uid: str
    sender: str
    subject: str
    date: str
    body_text: str
    body_html: str
    attachments: list[dict]


def connect_imap() -> imaplib.IMAP4_SSL:
    """Connect to IMAP server and select inbox."""
    host = os.environ["IMAP_HOST"]      # e.g., imap.gmail.com
    user = os.environ["IMAP_USER"]
    passwd = os.environ["IMAP_PASS"]

    mail = imaplib.IMAP4_SSL(host)
    mail.login(user, passwd)
    mail.select("INBOX")
    return mail


def decode_subject(subject_header) -> str:
    """Decode email subject from MIME encoding."""
    if subject_header is None:
        return "(no subject)"
    parts = decode_header(subject_header)
    decoded = []
    for content, encoding in parts:
        if isinstance(content, bytes):
            decoded.append(content.decode(encoding or "utf-8", errors="replace"))
        else:
            decoded.append(content)
    return " ".join(decoded)


def search_emails(mail: imaplib.IMAP4_SSL,
                  criteria: str = "ALL",
                  limit: int = 10) -> list[str]:
    """Search emails and return UIDs."""
    status, data = mail.uid("search", None, criteria)
    if status != "OK":
        return []
    uids = data[0].split()
    return [uid.decode() for uid in uids[-limit:]]  # Latest N


def fetch_email(mail: imaplib.IMAP4_SSL, uid: str) -> EmailMessage:
    """Fetch and parse a single email by UID."""
    status, data = mail.uid("fetch", uid, "(RFC822)")
    if status != "OK":
        raise ValueError(f"Failed to fetch UID {uid}")

    raw = data[0][1]
    msg = email.message_from_bytes(raw)

    # Extract body and attachments
    body_text = ""
    body_html = ""
    attachments = []

    for part in msg.walk():
        content_type = part.get_content_type()
        disposition = str(part.get("Content-Disposition", ""))

        if "attachment" in disposition:
            filename = part.get_filename() or "unnamed"
            attachments.append({
                "filename": filename,
                "content_type": content_type,
                "size": len(part.get_payload(decode=True) or b""),
                "data": part.get_payload(decode=True)
            })
        elif content_type == "text/plain":
            payload = part.get_payload(decode=True)
            if payload:
                body_text = payload.decode(
                    part.get_content_charset() or "utf-8", errors="replace"
                )
        elif content_type == "text/html":
            payload = part.get_payload(decode=True)
            if payload:
                body_html = payload.decode(
                    part.get_content_charset() or "utf-8", errors="replace"
                )

    return EmailMessage(
        uid=uid,
        sender=msg.get("From", ""),
        subject=decode_subject(msg.get("Subject")),
        date=msg.get("Date", ""),
        body_text=body_text,
        body_html=body_html,
        attachments=attachments,
    )


# Usage: read the 5 latest unread emails
mail = connect_imap()
uids = search_emails(mail, "UNSEEN", limit=5)

for uid in uids:
    em = fetch_email(mail, uid)
    print(f"From: {em.sender}")
    print(f"Subject: {em.subject}")
    print(f"Attachments: {len(em.attachments)}")
    print(f"Preview: {em.body_text[:200]}")
    print("---")

mail.logout()

Common IMAP Search Criteria

# Unread messages
search_emails(mail, "UNSEEN")

# From a specific sender
search_emails(mail, 'FROM "boss@example.com"')

# Messages with a keyword in subject
search_emails(mail, 'SUBJECT "invoice"')

# Since a specific date (DD-Mon-YYYY)
search_emails(mail, 'SINCE "20-Mar-2026"')

# Combine criteria (AND is implicit)
search_emails(mail, 'UNSEEN FROM "support@stripe.com" SINCE "01-Mar-2026"')

# OR logic
search_emails(mail, 'OR FROM "alice@co.com" FROM "bob@co.com"')
Tip: Use UIDs instead of sequence numbers. UIDs are persistent across sessions, while sequence numbers can change when emails are deleted or moved.

Parsing Email Content & Attachments

Once you can read emails, the next step is extracting structured data. Here's a parser that handles common patterns:

import re
from pathlib import Path


class EmailParser:
    """Extract structured data from email content."""

    @staticmethod
    def extract_amounts(text: str) -> list[float]:
        """Find dollar amounts in text."""
        pattern = r'\$[\d,]+\.?\d*'
        matches = re.findall(pattern, text)
        return [float(m.replace('$', '').replace(',', '')) for m in matches]

    @staticmethod
    def extract_urls(text: str) -> list[str]:
        """Find URLs in text."""
        pattern = r'https?://[^\s<>"\')\]]+' 
        return re.findall(pattern, text)

    @staticmethod
    def extract_emails(text: str) -> list[str]:
        """Find email addresses in text."""
        pattern = r'[\w.+-]+@[\w-]+\.[\w.-]+'
        return re.findall(pattern, text)

    @staticmethod
    def extract_dates(text: str) -> list[str]:
        """Find common date formats."""
        patterns = [
            r'\d{4}-\d{2}-\d{2}',          # 2026-03-25
            r'\d{1,2}/\d{1,2}/\d{2,4}',    # 3/25/2026
            r'\w+ \d{1,2},? \d{4}',         # March 25, 2026
        ]
        dates = []
        for p in patterns:
            dates.extend(re.findall(p, text))
        return dates

    @staticmethod
    def save_attachments(email_msg: 'EmailMessage',
                         output_dir: str = "attachments") -> list[str]:
        """Save all attachments to disk."""
        out = Path(output_dir)
        out.mkdir(parents=True, exist_ok=True)
        saved = []

        for att in email_msg.attachments:
            filepath = out / att["filename"]
            # Avoid overwrites
            counter = 1
            while filepath.exists():
                stem = Path(att["filename"]).stem
                suffix = Path(att["filename"]).suffix
                filepath = out / f"{stem}_{counter}{suffix}"
                counter += 1

            filepath.write_bytes(att["data"])
            saved.append(str(filepath))
            print(f"Saved: {filepath} ({att['size']} bytes)")

        return saved


# Usage
parser = EmailParser()

mail = connect_imap()
uids = search_emails(mail, 'SUBJECT "invoice"', limit=5)

for uid in uids:
    em = fetch_email(mail, uid)

    # Extract financial data
    amounts = parser.extract_amounts(em.body_text)
    if amounts:
        print(f"Invoice from {em.sender}: amounts = {amounts}")

    # Save attachments (PDFs, CSVs, etc.)
    if em.attachments:
        files = parser.save_attachments(em, "invoices")
        print(f"Saved {len(files)} attachments")

mail.logout()

Building an Email Processing Pipeline

Now let's combine everything into a real processing pipeline — a system that monitors your inbox, matches rules, and takes actions automatically:

import imaplib
import time
import json
import logging
from dataclasses import dataclass, field
from pathlib import Path

logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)


@dataclass
class Rule:
    name: str
    match_from: str = ""        # Regex for sender
    match_subject: str = ""     # Regex for subject
    match_body: str = ""        # Regex for body
    actions: list[str] = field(default_factory=list)
    # Actions: "save_attachments", "forward:addr@example.com",
    #          "label:important", "log", "webhook:https://..."


class EmailPipeline:
    """Rule-based email processing pipeline."""

    def __init__(self, rules: list[Rule], state_file: str = "pipeline_state.json"):
        self.rules = rules
        self.state_file = Path(state_file)
        self.state = self._load_state()

    def _load_state(self) -> dict:
        if self.state_file.exists():
            return json.loads(self.state_file.read_text())
        return {"last_uid": "0", "processed": 0}

    def _save_state(self):
        self.state_file.write_text(json.dumps(self.state, indent=2))

    def match_rule(self, email_msg: EmailMessage, rule: Rule) -> bool:
        """Check if an email matches a rule."""
        import re
        if rule.match_from:
            if not re.search(rule.match_from, email_msg.sender, re.I):
                return False
        if rule.match_subject:
            if not re.search(rule.match_subject, email_msg.subject, re.I):
                return False
        if rule.match_body:
            if not re.search(rule.match_body, email_msg.body_text, re.I):
                return False
        return True

    def execute_actions(self, email_msg: EmailMessage, rule: Rule):
        """Execute all actions for a matched rule."""
        for action in rule.actions:
            if action == "save_attachments":
                parser = EmailParser()
                parser.save_attachments(email_msg, f"attachments/{rule.name}")

            elif action == "log":
                logger.info(
                    f"[{rule.name}] {email_msg.sender} — {email_msg.subject}"
                )

            elif action.startswith("forward:"):
                to_addr = action.split(":", 1)[1]
                send_email(
                    to_addr,
                    f"FWD: {email_msg.subject}",
                    f"Forwarded from {email_msg.sender}\n\n{email_msg.body_text}"
                )
                logger.info(f"Forwarded to {to_addr}")

            elif action.startswith("webhook:"):
                import urllib.request
                url = action.split(":", 1)[1]
                payload = json.dumps({
                    "from": email_msg.sender,
                    "subject": email_msg.subject,
                    "date": email_msg.date,
                    "preview": email_msg.body_text[:500],
                    "rule": rule.name,
                }).encode()
                req = urllib.request.Request(
                    url, data=payload,
                    headers={"Content-Type": "application/json"}
                )
                urllib.request.urlopen(req, timeout=10)
                logger.info(f"Webhook sent to {url}")

    def process_new_emails(self):
        """Check for new emails and process them against rules."""
        mail = connect_imap()
        try:
            # Search for emails newer than our last processed UID
            uids = search_emails(mail, "UNSEEN", limit=50)

            new_count = 0
            for uid in uids:
                if int(uid) <= int(self.state["last_uid"]):
                    continue

                em = fetch_email(mail, uid)
                new_count += 1

                # Check each rule
                matched = False
                for rule in self.rules:
                    if self.match_rule(em, rule):
                        logger.info(f"Rule '{rule.name}' matched: {em.subject}")
                        self.execute_actions(em, rule)
                        matched = True

                if not matched:
                    logger.debug(f"No rules matched: {em.subject}")

                self.state["last_uid"] = uid
                self.state["processed"] += 1

            self._save_state()
            logger.info(f"Processed {new_count} new emails "
                        f"(total: {self.state['processed']})")

        finally:
            mail.logout()

    def run_forever(self, interval: int = 60):
        """Poll for new emails at a regular interval."""
        logger.info(f"Starting email pipeline (checking every {interval}s)")
        while True:
            try:
                self.process_new_emails()
            except Exception as e:
                logger.error(f"Pipeline error: {e}", exc_info=True)
            time.sleep(interval)


# Setup rules
rules = [
    Rule(
        name="invoices",
        match_subject=r"invoice|receipt|payment",
        actions=["save_attachments", "log"]
    ),
    Rule(
        name="urgent-alerts",
        match_from=r"alerts@myservice\.com",
        match_subject=r"(critical|down|error)",
        actions=["forward:oncall@team.com", "webhook:https://hooks.slack.com/xxx"]
    ),
    Rule(
        name="support-tickets",
        match_from=r"support@|help@",
        actions=["log", "webhook:https://api.crm.com/tickets"]
    ),
]

# Run
pipeline = EmailPipeline(rules)
pipeline.run_forever(interval=120)  # Check every 2 minutes
Important: Polling every few minutes is fine for most use cases. For real-time processing, look into IMAP IDLE (push notifications) — but not all providers support it reliably.

Email Templates with Jinja2

For sending personalized emails at scale, templates are essential. Here's how to use Jinja2 (one pip install jinja2 away):

from jinja2 import Template


# Define a template
WELCOME_TEMPLATE = """
<html>
<body style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto;">
  <h2 style="color: #333;">Welcome, {{ name }}!</h2>
  <p>Thanks for signing up for {{ product }}. Here's what's next:</p>
  <ol>
    {% for step in onboarding_steps %}
    <li>{{ step }}</li>
    {% endfor %}
  </ol>
  {% if trial_days %}
  <p>Your free trial expires in <strong>{{ trial_days }} days</strong>.</p>
  {% endif %}
  <p>Questions? Just reply to this email.</p>
  <p>— The {{ product }} Team</p>
</body>
</html>
"""


def send_welcome_email(user: dict):
    """Send a personalized welcome email."""
    template = Template(WELCOME_TEMPLATE)
    html = template.render(
        name=user["name"],
        product="AI Toolkit Pro",
        onboarding_steps=[
            "Check your dashboard",
            "Import your first dataset",
            "Run your first automation",
            "Invite your team members",
        ],
        trial_days=user.get("trial_days", 14),
    )

    send_html_email(
        user["email"],
        f"Welcome to AI Toolkit Pro, {user['name']}!",
        html,
        text_body=f"Welcome, {user['name']}! Thanks for signing up."
    )


# Send to a list of new users
new_users = [
    {"name": "Alice", "email": "alice@example.com", "trial_days": 14},
    {"name": "Bob", "email": "bob@example.com", "trial_days": 30},
]

for user in new_users:
    send_welcome_email(user)
    import time; time.sleep(2)  # Rate limit

Production Tips & Error Handling

Email automation in production needs to handle failures gracefully. Here are the patterns that matter:

Retry with Exponential Backoff

import time
import smtplib


def send_with_retry(to_addr: str, subject: str, body: str,
                    max_retries: int = 3) -> bool:
    """Send email with exponential backoff on failure."""
    for attempt in range(max_retries):
        try:
            return send_email(to_addr, subject, body)
        except smtplib.SMTPServerDisconnected:
            wait = 2 ** attempt  # 1s, 2s, 4s
            print(f"Connection lost, retrying in {wait}s (attempt {attempt + 1})")
            time.sleep(wait)
        except smtplib.SMTPRecipientsRefused as e:
            print(f"Recipient refused: {e}")
            return False  # Don't retry — address is invalid
        except smtplib.SMTPException as e:
            wait = 2 ** attempt
            print(f"SMTP error: {e}, retrying in {wait}s")
            time.sleep(wait)

    print(f"Failed after {max_retries} attempts")
    return False

Connection Pooling

class SMTPPool:
    """Reuse SMTP connection for batch sending."""

    def __init__(self):
        self.connection = None

    def _connect(self):
        import os
        self.connection = smtplib.SMTP(
            os.environ["SMTP_HOST"],
            int(os.environ.get("SMTP_PORT", "587"))
        )
        self.connection.ehlo()
        self.connection.starttls()
        self.connection.login(
            os.environ["SMTP_USER"],
            os.environ["SMTP_PASS"]
        )

    def send(self, from_addr: str, to_addr: str, msg: str) -> bool:
        """Send using pooled connection, reconnect if needed."""
        for attempt in range(2):
            try:
                if self.connection is None:
                    self._connect()
                self.connection.sendmail(from_addr, to_addr, msg)
                return True
            except (smtplib.SMTPServerDisconnected, smtplib.SMTPException):
                self.connection = None  # Force reconnect
                if attempt == 0:
                    continue
                raise

    def close(self):
        if self.connection:
            try:
                self.connection.quit()
            except Exception:
                pass
            self.connection = None

Rate Limiting

import time
from collections import deque


class RateLimiter:
    """Sliding window rate limiter for email sending."""

    def __init__(self, max_per_minute: int = 30, max_per_hour: int = 500):
        self.max_per_minute = max_per_minute
        self.max_per_hour = max_per_hour
        self.minute_window = deque()
        self.hour_window = deque()

    def wait_if_needed(self):
        """Block until we're within rate limits."""
        now = time.time()

        # Clean old entries
        while self.minute_window and now - self.minute_window[0] > 60:
            self.minute_window.popleft()
        while self.hour_window and now - self.hour_window[0] > 3600:
            self.hour_window.popleft()

        # Wait if at limit
        if len(self.minute_window) >= self.max_per_minute:
            sleep_time = 60 - (now - self.minute_window[0])
            if sleep_time > 0:
                print(f"Rate limit: waiting {sleep_time:.1f}s (minute)")
                time.sleep(sleep_time)

        if len(self.hour_window) >= self.max_per_hour:
            sleep_time = 3600 - (now - self.hour_window[0])
            if sleep_time > 0:
                print(f"Rate limit: waiting {sleep_time:.1f}s (hour)")
                time.sleep(sleep_time)

        self.minute_window.append(time.time())
        self.hour_window.append(time.time())

Security Best Practices

Email credentials are sensitive. Follow these rules:

  1. Never hardcode passwords. Use environment variables or a secrets manager.
  2. Use App Passwords for Gmail/Outlook instead of your main password. Enable 2FA first, then generate an app-specific password.
  3. Use OAuth2 for production apps. App passwords are fine for personal scripts; OAuth2 is required for apps that access other people's accounts.
  4. Restrict IMAP access. Only fetch what you need. Don't download entire mailboxes unnecessarily.
  5. Log actions, not content. Log that you processed an email, but don't log the body or attachments.

Loading Credentials Safely

import os
from pathlib import Path


def load_email_config() -> dict:
    """Load email config from environment or .env file."""
    # Try environment first
    config = {
        "smtp_host": os.environ.get("SMTP_HOST"),
        "smtp_port": int(os.environ.get("SMTP_PORT", "587")),
        "smtp_user": os.environ.get("SMTP_USER"),
        "smtp_pass": os.environ.get("SMTP_PASS"),
        "imap_host": os.environ.get("IMAP_HOST"),
        "imap_user": os.environ.get("IMAP_USER"),
        "imap_pass": os.environ.get("IMAP_PASS"),
    }

    # Fall back to .env file
    env_file = Path(".env")
    if env_file.exists() and not config["smtp_host"]:
        for line in env_file.read_text().splitlines():
            line = line.strip()
            if line and not line.startswith("#") and "=" in line:
                key, value = line.split("=", 1)
                os.environ[key.strip()] = value.strip()
        # Reload
        return load_email_config()

    # Validate required fields
    required = ["smtp_host", "smtp_user", "smtp_pass"]
    missing = [k for k in required if not config.get(k)]
    if missing:
        raise ValueError(f"Missing email config: {', '.join(missing)}")

    return config
Gmail users: You need an App Password. Go to Google Account → Security → 2-Step Verification → App Passwords. Regular passwords won't work with SMTP/IMAP.

Complete Example: Invoice Processor

Let's put it all together — a complete script that monitors your inbox for invoices, extracts amounts, saves PDFs, and creates a summary report:

#!/usr/bin/env python3
"""
Invoice Processor — monitors inbox, extracts invoice data, generates reports.
Requires: pip install jinja2 (optional, for HTML reports)
"""
import imaplib
import email
import re
import json
import csv
import logging
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, field, asdict

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("invoice_processor.log"),
    ]
)
logger = logging.getLogger(__name__)


@dataclass
class Invoice:
    sender: str
    subject: str
    date: str
    amounts: list[float] = field(default_factory=list)
    total: float = 0.0
    attachments: list[str] = field(default_factory=list)
    uid: str = ""


class InvoiceProcessor:
    def __init__(self, output_dir: str = "invoices"):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.invoices: list[Invoice] = []
        self.state_file = self.output_dir / "state.json"
        self.state = self._load_state()

    def _load_state(self) -> dict:
        if self.state_file.exists():
            return json.loads(self.state_file.read_text())
        return {"processed_uids": [], "last_run": None}

    def _save_state(self):
        self.state["last_run"] = datetime.now().isoformat()
        self.state_file.write_text(json.dumps(self.state, indent=2))

    def scan_inbox(self):
        """Scan for invoice emails and process them."""
        import os
        mail = imaplib.IMAP4_SSL(os.environ["IMAP_HOST"])
        mail.login(os.environ["IMAP_USER"], os.environ["IMAP_PASS"])
        mail.select("INBOX")

        # Search for invoice-related emails
        criteria = 'OR SUBJECT "invoice" OR SUBJECT "receipt" SUBJECT "payment"'
        status, data = mail.uid("search", None, criteria)

        if status != "OK":
            logger.error("IMAP search failed")
            return

        uids = data[0].split()
        new_count = 0

        for uid_bytes in uids:
            uid = uid_bytes.decode()
            if uid in self.state["processed_uids"]:
                continue

            try:
                invoice = self._process_email(mail, uid)
                if invoice:
                    self.invoices.append(invoice)
                    new_count += 1
                self.state["processed_uids"].append(uid)
            except Exception as e:
                logger.error(f"Error processing UID {uid}: {e}")

        mail.logout()
        self._save_state()
        logger.info(f"Processed {new_count} new invoice emails")

    def _process_email(self, mail, uid: str) -> Invoice | None:
        """Extract invoice data from a single email."""
        status, data = mail.uid("fetch", uid, "(RFC822)")
        if status != "OK":
            return None

        msg = email.message_from_bytes(data[0][1])
        body = ""
        saved_files = []

        for part in msg.walk():
            ctype = part.get_content_type()
            disposition = str(part.get("Content-Disposition", ""))

            if "attachment" in disposition:
                filename = part.get_filename() or f"attachment_{uid}"
                filepath = self.output_dir / filename

                counter = 1
                while filepath.exists():
                    stem = Path(filename).stem
                    suffix = Path(filename).suffix
                    filepath = self.output_dir / f"{stem}_{counter}{suffix}"
                    counter += 1

                payload = part.get_payload(decode=True)
                if payload:
                    filepath.write_bytes(payload)
                    saved_files.append(str(filepath))
                    logger.info(f"Saved attachment: {filepath}")

            elif ctype == "text/plain" and not body:
                payload = part.get_payload(decode=True)
                if payload:
                    charset = part.get_content_charset() or "utf-8"
                    body = payload.decode(charset, errors="replace")

        # Extract amounts
        amounts = [
            float(m.replace(",", ""))
            for m in re.findall(r'\$?([\d,]+\.\d{2})', body)
        ]

        if not amounts and not saved_files:
            return None  # Not a real invoice

        invoice = Invoice(
            sender=msg.get("From", ""),
            subject=msg.get("Subject", ""),
            date=msg.get("Date", ""),
            amounts=amounts,
            total=max(amounts) if amounts else 0.0,
            attachments=saved_files,
            uid=uid,
        )

        logger.info(
            f"Invoice: {invoice.sender} | ${invoice.total:.2f} | "
            f"{len(saved_files)} files"
        )
        return invoice

    def generate_report(self) -> str:
        """Generate a CSV summary of all processed invoices."""
        report_path = self.output_dir / f"report_{datetime.now():%Y%m%d}.csv"

        with open(report_path, "w", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(["Date", "Sender", "Subject", "Total", "Files"])
            for inv in sorted(self.invoices, key=lambda x: x.date):
                writer.writerow([
                    inv.date, inv.sender, inv.subject,
                    f"${inv.total:.2f}", len(inv.attachments)
                ])

        total = sum(inv.total for inv in self.invoices)
        logger.info(
            f"Report: {len(self.invoices)} invoices, "
            f"total: ${total:.2f} → {report_path}"
        )
        return str(report_path)


if __name__ == "__main__":
    processor = InvoiceProcessor()
    processor.scan_inbox()

    if processor.invoices:
        report = processor.generate_report()
        print(f"\nReport saved: {report}")
        print(f"Total invoices: {len(processor.invoices)}")
        print(f"Total amount: ${sum(i.total for i in processor.invoices):.2f}")
    else:
        print("No new invoices found.")

Want 50+ Ready-to-Use Python Automation Scripts?

The AI Toolkit includes email automation, web scraping, API integrations, data pipelines, and more — all production-ready with error handling and documentation.

Get the AI Toolkit — $19