๐Ÿค– AI Agent Toolkit

How to Build a RAG Pipeline in Python (Step-by-Step)

Retrieval-Augmented Generation (RAG) is probably the most practical AI pattern for developers right now. Instead of fine-tuning models on your data (expensive, slow, and often unnecessary), you retrieve relevant documents at query time and feed them to an LLM as context.

The result: your AI answers questions about your data โ€” company docs, codebases, research papers, product catalogs โ€” without hallucinating (much).

In this guide, I'll walk through building a complete RAG pipeline from scratch in Python. No LangChain, no LlamaIndex โ€” just the core components so you understand what's actually happening under the hood.

Documents โ†’ Chunking โ†’ Embeddings โ†’ Vector Store โ†’ Query โ†’ Retrieval โ†’ LLM โ†’ Answer

The Architecture

A RAG pipeline has two phases:

  1. Indexing โ€” Load documents, split them into chunks, generate embeddings, and store them
  2. Querying โ€” Take a user question, find relevant chunks, and pass them to an LLM with context

We'll build both. Here's what you need:

# requirements
# pip install openai numpy

# That's it. We're keeping dependencies minimal.
# For production, you'd add a real vector DB (Qdrant, Pinecone, Chroma).
# For this tutorial, we'll use a simple in-memory store.

Step 1: Document Loading

First, we need to load documents. In practice, these could be PDFs, markdown files, HTML pages, or database records. Let's start with plain text and markdown:

import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional


@dataclass
class Document:
    """A loaded document with metadata."""
    content: str
    source: str
    metadata: dict = field(default_factory=dict)


def load_documents(path: str) -> list[Document]:
    """Load all text/markdown files from a directory."""
    docs = []
    extensions = {'.txt', '.md', '.rst', '.py', '.json'}
    
    for filepath in Path(path).rglob('*'):
        if filepath.suffix in extensions and filepath.is_file():
            try:
                content = filepath.read_text(encoding='utf-8')
                docs.append(Document(
                    content=content,
                    source=str(filepath),
                    metadata={
                        'filename': filepath.name,
                        'extension': filepath.suffix,
                        'size_bytes': filepath.stat().st_size,
                    }
                ))
            except (UnicodeDecodeError, PermissionError):
                continue
    
    return docs


# Usage
docs = load_documents('./my-knowledge-base/')
print(f"Loaded {len(docs)} documents")

Nothing fancy here. The key insight: always keep track of where each piece of text came from. When the LLM generates an answer, you want to cite the source.

Step 2: Chunking โ€” The Most Important Step

This is where most RAG pipelines succeed or fail. If your chunks are too large, the LLM gets diluted context. Too small, and you lose meaning.

There are several strategies:

Let's implement recursive chunking โ€” the best balance of quality and simplicity:

@dataclass
class Chunk:
    """A text chunk with provenance."""
    text: str
    source: str
    chunk_index: int
    metadata: dict = field(default_factory=dict)


def chunk_document(
    doc: Document,
    max_chunk_size: int = 500,
    overlap: int = 50,
    separators: Optional[list[str]] = None,
) -> list[Chunk]:
    """Recursively split a document into overlapping chunks."""
    if separators is None:
        separators = ['\n\n', '\n', '. ', ' ']
    
    chunks = []
    text = doc.content.strip()
    
    if len(text) <= max_chunk_size:
        return [Chunk(text=text, source=doc.source, chunk_index=0, metadata=doc.metadata)]
    
    # Find the best separator that produces chunks under max size
    parts = _recursive_split(text, max_chunk_size, separators)
    
    # Merge small parts and add overlap
    merged = _merge_with_overlap(parts, max_chunk_size, overlap)
    
    for i, chunk_text in enumerate(merged):
        chunks.append(Chunk(
            text=chunk_text.strip(),
            source=doc.source,
            chunk_index=i,
            metadata={**doc.metadata, 'total_chunks': len(merged)},
        ))
    
    return chunks


def _recursive_split(text: str, max_size: int, separators: list[str]) -> list[str]:
    """Split text using the first separator that works."""
    if len(text) <= max_size:
        return [text]
    
    for sep in separators:
        parts = text.split(sep)
        if len(parts) > 1:
            result = []
            for part in parts:
                if len(part) > max_size:
                    # Recursively split with remaining separators
                    remaining_seps = separators[separators.index(sep) + 1:]
                    if remaining_seps:
                        result.extend(_recursive_split(part, max_size, remaining_seps))
                    else:
                        # Last resort: hard split
                        for i in range(0, len(part), max_size):
                            result.append(part[i:i + max_size])
                else:
                    result.append(part)
            return result
    
    # No separator worked โ€” hard split
    return [text[i:i + max_size] for i in range(0, len(text), max_size)]


def _merge_with_overlap(parts: list[str], max_size: int, overlap: int) -> list[str]:
    """Merge small parts together, maintaining overlap between chunks."""
    merged = []
    current = ""
    
    for part in parts:
        if len(current) + len(part) <= max_size:
            current += part
        else:
            if current:
                merged.append(current)
            # Start new chunk with overlap from previous
            if merged and overlap > 0:
                prev = merged[-1]
                overlap_text = prev[-overlap:] if len(prev) > overlap else prev
                current = overlap_text + part
            else:
                current = part
    
    if current:
        merged.append(current)
    
    return merged

Tuning Chunk Size

In practice, I've found these settings work well:

Step 3: Generating Embeddings

Embeddings convert text into numerical vectors that capture meaning. Similar texts end up close together in vector space โ€” that's how we find relevant chunks later.

import openai
import numpy as np
from time import sleep


def get_embeddings(
    texts: list[str],
    model: str = "text-embedding-3-small",
    batch_size: int = 100,
) -> list[list[float]]:
    """Generate embeddings for a list of texts using OpenAI API."""
    client = openai.OpenAI()  # Uses OPENAI_API_KEY env var
    all_embeddings = []
    
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        
        # Clean empty strings (API rejects them)
        batch = [t if t.strip() else " " for t in batch]
        
        response = client.embeddings.create(
            model=model,
            input=batch,
        )
        
        batch_embeddings = [item.embedding for item in response.data]
        all_embeddings.extend(batch_embeddings)
        
        # Rate limiting
        if i + batch_size < len(texts):
            sleep(0.1)
    
    return all_embeddings

Cost note: text-embedding-3-small costs about $0.02 per million tokens. For 1000 document chunks averaging 400 characters each, that's roughly $0.003. Essentially free for most use cases.

Step 4: Vector Store

Now we need somewhere to store embeddings and search them efficiently. For production, use a dedicated vector database (Qdrant, Pinecone, Weaviate, Chroma). For getting started, here's a simple NumPy-based store:

import json


class SimpleVectorStore:
    """In-memory vector store using cosine similarity."""
    
    def __init__(self):
        self.embeddings: list[list[float]] = []
        self.chunks: list[Chunk] = []
    
    def add(self, chunks: list[Chunk], embeddings: list[list[float]]):
        """Add chunks with their embeddings."""
        assert len(chunks) == len(embeddings)
        self.chunks.extend(chunks)
        self.embeddings.extend(embeddings)
    
    def search(self, query_embedding: list[float], top_k: int = 5) -> list[tuple[Chunk, float]]:
        """Find the most similar chunks to a query embedding."""
        if not self.embeddings:
            return []
        
        # Cosine similarity
        query = np.array(query_embedding)
        matrix = np.array(self.embeddings)
        
        # Normalize
        query_norm = query / (np.linalg.norm(query) + 1e-10)
        matrix_norm = matrix / (np.linalg.norm(matrix, axis=1, keepdims=True) + 1e-10)
        
        similarities = matrix_norm @ query_norm
        
        # Get top-k indices
        top_indices = np.argsort(similarities)[::-1][:top_k]
        
        results = []
        for idx in top_indices:
            results.append((self.chunks[idx], float(similarities[idx])))
        
        return results
    
    def save(self, path: str):
        """Persist to disk."""
        data = {
            'embeddings': self.embeddings,
            'chunks': [
                {'text': c.text, 'source': c.source, 'index': c.chunk_index, 'meta': c.metadata}
                for c in self.chunks
            ],
        }
        with open(path, 'w') as f:
            json.dump(data, f)
    
    def load(self, path: str):
        """Load from disk."""
        with open(path) as f:
            data = json.load(f)
        self.embeddings = data['embeddings']
        self.chunks = [
            Chunk(text=c['text'], source=c['source'], chunk_index=c['index'], metadata=c.get('meta', {}))
            for c in data['chunks']
        ]

This works fine for up to ~10,000 chunks. Beyond that, you'll want approximate nearest neighbor search (ANN) via a real vector database.

Step 5: The RAG Query Pipeline

Now we put it all together. When a user asks a question:

  1. Embed the question
  2. Search for relevant chunks
  3. Build a prompt with those chunks as context
  4. Send to LLM and return the answer
def rag_query(
    question: str,
    store: SimpleVectorStore,
    model: str = "gpt-4o-mini",
    top_k: int = 5,
    max_context_chars: int = 4000,
) -> dict:
    """Answer a question using RAG."""
    client = openai.OpenAI()
    
    # 1. Embed the question
    q_embedding = get_embeddings([question])[0]
    
    # 2. Retrieve relevant chunks
    results = store.search(q_embedding, top_k=top_k)
    
    # 3. Build context (respect token limits)
    context_parts = []
    total_chars = 0
    sources = set()
    
    for chunk, score in results:
        if score < 0.3:  # Skip low-relevance results
            continue
        if total_chars + len(chunk.text) > max_context_chars:
            break
        context_parts.append(f"[Source: {chunk.source}, Score: {score:.2f}]\n{chunk.text}")
        sources.add(chunk.source)
        total_chars += len(chunk.text)
    
    context = "\n\n---\n\n".join(context_parts)
    
    # 4. Build the prompt
    system_prompt = """You are a helpful assistant that answers questions based on the provided context.
Rules:
- Only use information from the context below to answer
- If the context doesn't contain the answer, say "I don't have enough information to answer that"
- Cite the source when possible
- Be concise and direct"""

    user_prompt = f"""Context:
{context}

Question: {question}

Answer:"""
    
    # 5. Query the LLM
    response = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
        temperature=0.1,  # Low temperature for factual answers
        max_tokens=500,
    )
    
    return {
        'answer': response.choices[0].message.content,
        'sources': list(sources),
        'chunks_used': len(context_parts),
        'model': model,
    }

Step 6: Putting It All Together

Here's the complete indexing and querying flow:

def index_directory(path: str, store_path: str = "vectors.json") -> SimpleVectorStore:
    """Index all documents in a directory."""
    # Load
    docs = load_documents(path)
    print(f"๐Ÿ“„ Loaded {len(docs)} documents")
    
    # Chunk
    all_chunks = []
    for doc in docs:
        chunks = chunk_document(doc, max_chunk_size=500, overlap=50)
        all_chunks.extend(chunks)
    print(f"โœ‚๏ธ  Created {len(all_chunks)} chunks")
    
    # Embed
    texts = [c.text for c in all_chunks]
    embeddings = get_embeddings(texts)
    print(f"๐Ÿงฎ Generated {len(embeddings)} embeddings")
    
    # Store
    store = SimpleVectorStore()
    store.add(all_chunks, embeddings)
    store.save(store_path)
    print(f"๐Ÿ’พ Saved to {store_path}")
    
    return store


# Index your knowledge base (one-time)
store = index_directory("./docs/")

# Ask questions
result = rag_query("How do I configure the database connection?", store)
print(f"\n๐Ÿค– {result['answer']}")
print(f"\n๐Ÿ“š Sources: {', '.join(result['sources'])}")

Production Tips

The pipeline above works, but here's what you'll want to add for production:

1. Hybrid Search

Combine vector similarity with keyword search (BM25). Vector search is great for semantic meaning, but sometimes you need exact term matching โ€” especially for names, error codes, and technical identifiers.

2. Re-ranking

After retrieving top-20 candidates, use a cross-encoder model to re-rank them. This significantly improves precision because cross-encoders see both query and document together, unlike bi-encoder embeddings.

3. Query Expansion

Before searching, use the LLM to generate alternative phrasings of the query. "How do I connect to the DB?" might miss chunks about "database configuration" โ€” query expansion catches these.

4. Chunk Metadata

Store rich metadata with each chunk: document title, section headers, creation date, author. Use this for filtering ("only search docs from the last 6 months") and for better citation in answers.

5. Evaluation

Build a test set of question-answer pairs. Measure retrieval recall (did we find the right chunks?) and answer quality (did the LLM produce correct answers?). Without measurement, you're tuning blind.

Get the Full Pipeline (+ 49 More Scripts)

This RAG pipeline is one of 50+ production-ready Python scripts in the AI Agent Toolkit. Includes price monitors, batch processors, data cleaners, web scrapers, and 30+ curated prompts.

Get the Toolkit โ€” $19

Instant download ยท All scripts tested ยท Lifetime updates

Common Pitfalls

Wrapping Up

Building a RAG pipeline is straightforward once you understand the components. The hard part isn't the code โ€” it's tuning the chunking strategy and evaluating quality for your specific use case.

Start simple (this article's code works), measure what matters (retrieval recall, answer accuracy), and iterate. Don't reach for a framework until you understand what each piece does.

Want to dive deeper? The AI Agent Toolkit includes the complete RAG implementation with error handling, retry logic, caching, and support for multiple embedding providers. Plus 49 other scripts for automation tasks you'll actually use.