Building RAGenius: A Production-Ready RAG System with FastAPI, Azure OpenAI & ChromaDB

Collapse
X
 
  • Time
  • Show
Clear All
new posts
  • MyrinNew
    Senior Member
    • Feb 2024
    • 5175

    #1

    Building RAGenius: A Production-Ready RAG System with FastAPI, Azure OpenAI & ChromaDB

    🧠 Building RAGenius: A Production-Ready RAG System

    Have you ever wanted to chat with your documents using AI? Whether it's PDFs, Excel spreadsheets, or JSON files - imagine having an intelligent assistant that can answer questions based on your entire document collection. That's exactly what I built with RAGenius!


    🤔 What is RAG?

    Retrieval-Augmented Generation (RAG) is a technique that combines the power of large language models (LLMs) with your own data. Instead of relying solely on the model's training data, RAG:

    1. Retrieves relevant information from your documents
    2. Augments the LLM prompt with this context
    3. Generates accurate, contextual answers


    This approach dramatically reduces hallucinations and allows LLMs to answer questions about your specific domain knowledge.


    💡 Why I Built RAGenius

    While experimenting with various RAG implementations, I noticed most tutorials focused on simple, single-file demos. I wanted something more:


    Production-ready with proper error handling


    Multi-format support (PDF, Excel, JSON, DOCX, CSV, TXT)


    Streaming responses for better UX


    REST API for easy integration


    Incremental updates without rebuilding the entire index


    Thus, RAGenius was born! 🎉

    🏗️ Architecture Overview

    RAGenius follows a clean, modular architecture:






    ┌─────────────┐
    │ Documents │ (PDF, Excel, JSON, etc.)
    └──────┬──────┘


    ┌─────────────────┐
    │ Data Loader │ (Multi-format processing)
    └──────┬──────────┘


    ┌─────────────────┐
    │ Chunking │ (Smart text splitting)
    └──────┬──────────┘


    ┌─────────────────┐
    │ Azure OpenAI │ (Generate embeddings)
    └──────┬──────────┘


    ┌─────────────────┐
    │ ChromaDB │ (Vector storage)
    └──────┬──────────┘


    ┌─────────────────┐
    │ RAG Engine │ (Query + Generate)
    └──────┬──────────┘


    ┌─────────────────┐
    │ FastAPI │ (REST API)
    └─────────────────┘







    🛠️ Tech Stack

    • FastAPI: Lightning-fast API framework
    • LangChain: Document processing and LLM orchestration
    • ChromaDB: Vector database for embeddings
    • Azure OpenAI: GPT-4 and embedding models
    • Python 3.10+: Core language
    • UV: Modern Python package manager


    🚀 Key Features Breakdown

    1️⃣ Multi-Format Document Processing

    One of the coolest features is the ability to handle various file types seamlessly:






    from src.data_loader import load_all_documents

    # Automatically detects and loads all supported formats
    docs = load_all_documents("data")
    print(f"Loaded {len(docs)} documents")







    The data_loader.py uses a smart mapping system:






    LOADER_MAP = {
    ".pdf": PyPDFLoader,
    ".txt": lambda path: TextLoader(path, encoding="utf-8"),
    ".csv": CSVLoader,
    ".docx": Docx2txtLoader,
    ".json": JSONLoader,
    ".xlsx": UnstructuredExcelLoader,
    }







    2️⃣ Smart Document Chunking

    Not all text should be split the same way. RAGenius uses RecursiveCharacterTextSplitter with configurable parameters:






    text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    length_function=len,
    separators=["\n\n", "\n", " ", ""]
    )







    Why overlap? It ensures context isn't lost at chunk boundaries - crucial for maintaining semantic coherence!


    3️⃣ Vector Storage with ChromaDB

    ChromaDB provides persistent, efficient vector storage:






    class ChromaVectorStore:
    def __init__(self, persist_directory="chromadb_store"):
    self.client = chromadb.PersistentClient(path=persist_directory)
    self.collection = self.client.get_or_create_collection(
    name="pdf_documents",
    metadata={"description": "PDF embeddings using Azure OpenAI"}
    )







    Key benefit: Your embeddings persist across restarts - no need to re-process documents!


    4️⃣ Streaming RAG Responses

    Modern UIs demand real-time feedback. RAGenius supports token-by-token streaming:






    async def stream_query(self, question: str, top_k: int = 5):
    """Async generator for true token streaming"""
    results = self.vectorstore.query(question, top_k=top_k)
    docs = results.get("documents", [[]])[0]

    if not docs:
    yield "No relevant context found."
    return

    context = "\n\n".join(docs)
    prompt = self._build_prompt(context, question)

    async for chunk in self.llm.astream([HumanMessage(content=prompt)]):
    token = getattr(chunk, "content", str(chunk))
    yield token







    5️⃣ RESTful API with FastAPI

    Three main endpoints power the system:


    📤 Upload Documents






    curl -X POST "http://localhost:8000/rag/upload" \
    -F "files=@document.pdf" \
    -F "files=@spreadsheet.xlsx"







    🔍 Basic Query






    curl -X POST "http://localhost:8000/rag/basic" \
    -H "Content-Type: application/json" \
    -d '{"query": "What is attention mechanism?", "top_k": 5}'







    🌊 Streaming Query






    curl -X POST "http://localhost:8000/rag/stream" \
    -H "Content-Type: application/json" \
    -d '{"query": "Explain transformers", "top_k": 3}' \
    --no-buffer







    🎯 The RAG Pipeline in Action

    Here's what happens when you ask a question:

    1. Query Embedding: Your question is converted to a vector using Azure OpenAI
    2. Similarity Search: ChromaDB finds the top-k most relevant document chunks
    3. Context Building: Retrieved chunks are combined into a context window
    4. Prompt Construction: The context and question are formatted into a prompt
    5. LLM Generation: GPT-4 generates an answer based on the provided context
    6. Streaming Response: Tokens are streamed back to the client in real-time




    def query(self, question: str, top_k: int = 5):
    # Step 1 & 2: Retrieve relevant context
    results = self.vectorstore.query(question, top_k=top_k)
    docs = results.get("documents", [[]])[0]

    # Step 3: Build context
    context = "\n\n".join(docs)

    # Step 4: Construct prompt
    prompt = f"""
    Use the following context to answer the question.

    Context:
    {context}

    Question: {question}

    Answer:
    """

    # Step 5: Generate response
    response = self.llm.invoke([HumanMessage(content=prompt)])
    return {"answer": response.content}







    📊 Performance Optimizations

    Chunking Strategy

    • Chunk Size: 1000 characters - balances context vs. precision
    • Overlap: 200 characters - maintains semantic continuity
    • Smart Separators: Prioritizes paragraph breaks over word breaks


    Embedding Efficiency

    • Batch Processing: Multiple chunks embedded in single API calls
    • Persistent Storage: Embeddings cached in ChromaDB
    • Incremental Updates: Add new documents without re-embedding existing ones


    Query Optimization

    • Top-K Selection: Default k=5 balances relevance and token usage
    • Temperature Control: 0.7 provides creative yet grounded responses
    • Async Operations: Non-blocking streaming for better UX


    🐛 Challenges & Solutions

    Challenge 1: JSONLoader Complexity

    Problem: JSONLoader required jq_schema parameter, complicating multi-format support.


    Solution: Implemented dynamic loader selection with custom error handling:






    def dynamic_loader(file_path: str):
    ext = Path(file_path).suffix.lower()
    loader_cls = LOADER_MAP.get(ext)
    if not loader_cls:
    raise ValueError(f"❌ Unsupported file type: {file_path}")
    return loader_cls(file_path)







    Challenge 2: Streaming with FastAPI

    Problem: Server-Sent Events (SSE) format required careful handling.


    Solution: Used StreamingResponse with proper headers:






    return StreamingResponse(
    stream_response(),
    media_type="text/event-stream",
    headers={
    "Cache-Control": "no-cache",
    "Connection": "keep-alive",
    "X-Accel-Buffering": "no"
    }
    )







    Challenge 3: File Upload Memory Management

    Problem: Large file uploads could cause memory issues.


    Solution: Temporary directory with automatic cleanup:






    temp_dir = os.path.join(DATA_DIR, f"temp_{uuid.uuid4().hex[:8]}")
    try:
    # Process files
    docs = load_all_documents(temp_dir)
    vectorstore.add_documents(docs)
    finally:
    if os.path.exists(temp_dir):
    shutil.rmtree(temp_dir)







    🎓 Lessons Learned

    1. Modular Design Pays Off: Separating concerns (loading, embedding, storage, querying) made debugging and testing much easier.
    2. Async is Essential: For streaming responses and file processing, async/await dramatically improves performance.
    3. Error Handling Matters: Production systems need comprehensive logging and graceful error recovery.
    4. Chunk Overlap is Critical: Without overlap, important context can be lost at boundaries, leading to incomplete answers.
    5. Persistent Storage Rocks: ChromaDB's persistence means zero downtime for re-indexing after restarts.


    🔮 Future Enhancements

    Here's what's on the roadmap:
    • [ ] Multi-LLM Support: OpenAI, Anthropic Claude, Cohere
    • [ ] Web UI: React-based interface for document management
    • [ ] Advanced Filtering: Metadata-based search refinement
    • [ ] Cloud Storage Integration: S3, Azure Blob, Google Cloud Storage
    • [ ] Conversation Memory: Multi-turn dialogue support
    • [ ] Fine-tuned Embeddings: Domain-specific embedding models
    • [ ] Kubernetes Manifests: Production-ready deployment configs


    🚀 Getting Started

    Want to try RAGenius? It's super easy:






    # Clone the repository
    git clone https://github.com/AquibPy/RAGenius.git
    cd RAGenius

    # Install dependencies (using UV)
    uv sync

    # Set up environment variables
    cp .env.example .env
    # Add your Azure OpenAI credentials

    # Start the server
    uvicorn app:app --reload

    # Visit http://localhost:8000/docs for API documentation







    📝 Example Usage

    Python Script





    from src.search import RAGEngine

    # Initialize
    rag = RAGEngine()

    # Query
    result = rag.query(
    "What is the attention mechanism in transformers?",
    top_k=5
    )

    print(result["answer"])







    CLI





    python main.py \
    --query "Explain BERT architecture" \
    --mode streaming







    API





    import requests

    response = requests.post(
    "http://localhost:8000/rag/basic",
    json={"query": "What is machine learning?", "top_k": 3}
    )

    print(response.json()["answer"])







    🎉 Conclusion

    Building RAGenius has been an incredible learning experience. It combines cutting-edge AI technologies with practical software engineering to create a tool that's actually useful in production environments.


    The beauty of RAG systems is that they make LLMs grounded in reality - answering questions based on YOUR data, not just internet-scale training data. Whether you're building internal knowledge bases, customer support systems, or research tools, RAG is the way forward.


    🔗 Links



    💬 Let's Connect!

    I'd love to hear your thoughts and ideas! Feel free to:
    • ⭐ Star the repo if you find it useful
    • 🐛 Report bugs or request features via GitHub Issues
    • 🤝 Contribute through Pull Requests
    • 💬 Connect with me on X or LinkedIn


    Have you built any RAG systems? What challenges did you face? Drop a comment below! 👇





    Happy coding! 🚀




    More...
Working...