Building a Supermarket Data Pipeline

Collapse
X
 
  • Time
  • Show
Clear All
new posts
  • MyrinNew
    Senior Member
    • Feb 2024
    • 5168

    #1

    Building a Supermarket Data Pipeline

    How I Built an Automated System That Turns Messy Sales Data Into Business Gold




    Ever wonder how your favorite supermarket knows exactly when to restock the shelves, which products are flying off the racks, or why they always seem to have your favorite snacks in stock? The secret lies in data pipelines, and I built one from scratch.





    The Problem: Data Drowning

    Imagine you're the manager of a busy supermarket(e.g., Naivas). Every single day, thousands of transactions flow through your registers, customers buying milk, bread, snacks, cleaning supplies. Each transaction generates a line of data: who bought what, how much they paid, and how they paid.


    Now here's the challenge: all this data is sitting in a messy Google spreadsheet, updated by cashiers in real-time. It's like having a river of gold nuggets flowing past you, but no way to catch them.


    The questions that keep you up at night:
    • Which products are selling the most?
    • What payment methods do customers prefer?
    • Are there duplicate transactions messing up your accounting?
    • How can you make this data useful for reports AND for your mobile app?


    This is exactly the problem I solved with the Supermarket ETL Pipeline.





    The Solution: An Automated Data Factory

    Think of my solution like a water treatment plant for data:


    Extract Pumping water from the river Pulling raw sales data from Google Sheets
    Transform Filtering out dirt and impurities Cleaning duplicates, fixing missing values
    Load Storing clean water in tanks Saving clean data to PostgreSQL & MongoDB


    The Google Sheet





    Capture the source Google Sheet showing raw transaction data with columns like id, quantity, product_name, total_amount, payment_method, customer_type. Show some messy/duplicate rows if possible.





    How It Works

    Step 1: Extraction: "Fishing for Data"

    My pipeline starts by reaching out to Google Sheets, think of it like casting a fishing net into a lake. The spreadsheet contains raw transaction records: every purchase, every customer, every payment.






    The Pipeline says: "Hey Google, give me all the sales data!"
    Google responds: "Here's 1,000 rows of transactions!"







    Why Google Sheets? Because it's where real businesses often keep their data, it's accessible, shareable, and doesn't require expensive software.


    Terminal showing extraction logs





    Capture the terminal output showing: "Starting extraction from Google Sheets" and "Extracted X rows" messages.





    Step 2: Transformation: "The Car Wash for Data"

    Raw data is messy. Imagine every car that comes through a car wash covered in mud, leaves, and bird droppings. The transformation stage is my car wash, it takes dirty data and makes it sparkle.


    What gets cleaned:


    Duplicate transactions (same ID twice) Removed automatically
    Missing transaction IDs Rows dropped
    Unnecessary columns Only essential fields kept


    The pipeline keeps only what matters:
    • id — Unique transaction identifier
    • quantity — How many items purchased
    • product_name — What was bought
    • total_amount — How much was paid
    • payment_method — Cash, card, or digital
    • customer_type — Member or regular customer


    Transformation Logs




    Step 3: Loading: "Two Warehouses, Two Purposes"

    Here's where it gets interesting. Instead of storing data in just one place, I built a dual-database strategy. Think of it like having two different storage facilities:


    PostgreSQL: The Library

    PostgreSQL is like a meticulously organized library. Every book (data record) has its place, follows strict rules, and can be cross-referenced with other books easily.


    Best for:
    • Financial reports ("How much revenue did we make last month?")
    • Accounting audits (data integrity is guaranteed)
    • Complex queries ("Show me all cash transactions over $100 from member customers")


    MongoDB: The Flexible Warehouse

    MongoDB is like a modern warehouse with adjustable shelving. You can store items of different shapes and sizes without reorganizing everything.


    Best for:
    • Mobile app backends (JSON-friendly)
    • Rapid prototyping ("Let's quickly add a new field!")
    • Analytics dashboards (flexible data exploration)


    Docker containers running





    PostgreSQL data view





    MongoDB data view




    How It Works (The Technical Deep-Dive)

    For my fellow engineers, let's pop the hood and look at the engine.


    Architecture Overview





    ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
    │ Google Sheets │────▶│ Python ETL │────▶│ PostgreSQL │
    │ (Data Source) │ │ (Container) │ │ (Relational) │
    └─────────────────┘ │ │ └─────────────────┘
    │ • Extract │
    │ • Transform │ ┌─────────────────┐
    │ • Load │────▶│ MongoDB │
    └─────────────────┘ │ (Document) │
    └─────────────────┘







    Project folder structure





    The Modular Design Philosophy

    Instead of one giant script, I split the pipeline into specialized modules, like having different specialists in a hospital:


    config.py Configuration management Hospital administrator
    extract.py Data extraction Ambulance driver
    transform.py Data cleaning Surgeon
    load_postgres.py PostgreSQL loading Recovery ward nurse
    load_mongo.py MongoDB loading Rehabilitation specialist
    main.py Orchestration Chief of Medicine


    Why this matters:
    • Testability: I can test the transformation logic without needing a database connection
    • Maintainability: Changing the data source doesn't break the loading logic
    • Scalability: Adding a new destination (like Snowflake) is just adding one new file


    main.py code






    from etl_pipeline.config import Config
    from etl_pipeline.extract import extract_data
    from etl_pipeline.transform import transform_data
    from etl_pipeline.load_postgres import load_to_postgres
    from etl_pipeline.load_mongo import load_to_mongo
    import sys
    import logging

    # Configure logging to stdout
    logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
    )


    def main():
    logging.info("ETL Application pipeline initialized.")

    # 1. Extract
    try:
    if Config.DATA_SOURCE_TYPE == "sheets":
    logging.info(f"Starting extraction from Google Sheets (ID: {Config.GOOGLE_SHEET_ID})")

    # Extract data
    data = extract_data(
    source_type="sheets",
    sheet_id=Config.GOOGLE_SHEET_ID
    )
    else:
    logging.error(f"Unknown data source: {Config.DATA_SOURCE_TYPE}")
    return

    logging.info(f"Extracted {len(data)} rows.")


    # 2. Transform
    logging.info("Step 2: Transform")
    transformed_data = transform_data(data)
    logging.info(f"Transformed Data Shape: {transformed_data.shape}")

    # 3. Load to PostgreSQL
    logging.info("Step 3: Load to PostgreSQL")
    load_to_postgres(transformed_data, Config.POSTGRES_URL)

    # Load to MongoDB
    logging.info("Step 4: Load to MongoDB")
    load_to_mongo(
    transformed_data,
    Config.MONGO_URI,
    Config.MONGO_DB
    )

    logging.info("\nETL pipeline completed successfully.")

    except Exception as e:
    logging.critical(f"ETL failed: {e}")

    if __name__ == "__main__":
    main()







    The Code Walkthrough

    Extraction: Pandas Does the Heavy Lifting





    def extract_from_public_sheet(sheet_id):
    export_url = f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv"
    df = pd.read_csv(export_url)
    return df







    The magic: Google Sheets can export any public sheet as CSV. Pandas reads it directly from the URL, no authentication needed for public sheets!


    Transformation: Clean Data or Bust





    def transform_data(df):
    required_columns = ["id", "quantity", "product_name",
    "total_amount", "payment_method", "customer_type"]

    df_transformed = df[required_columns].copy()
    df_transformed.drop_duplicates(subset=['id'], inplace=True)
    df_transformed.dropna(subset=['id'], inplace=True)

    return df_transformed







    Key decisions:
    • Only keep essential columns (data minimization)
    • Remove duplicates by transaction ID (data integrity)
    • Drop rows with missing IDs (no orphan records)


    Loading: Two Paths, One Pipeline

    PostgreSQL with SQLAlchemy:






    def load_to_postgres(df, db_url, table_name="transactions"):
    engine = create_engine(db_url)
    df.to_sql(table_name, engine, if_exists='replace', index=False)







    MongoDB with PyMongo:






    def load_to_mongo(df, mongo_uri, db_name, collection_name="transactions"):
    client = MongoClient(mongo_uri)
    collection = client[db_name][collection_name]
    records = df.to_dict("records")
    collection.insert_many(records)







    Successful ETL run








    Docker: The "It Works on My Machine" Killer

    One of the biggest headaches in software is environment setup. "It works on my machine!" is the developer's equivalent of "the dog ate my homework."


    Docker solves this by containerizing everything. My entire stack, Python app, PostgreSQL, MongoDB runs in isolated containers that work identically on any machine.


    The docker-compose.yml Magic





    services:
    postgres:
    image: postgres:15
    # PostgreSQL runs in its own container

    mongo:
    image: mongo:6
    # MongoDB runs in its own container

    etl-app:
    build: .
    depends_on:
    - postgres
    - mongo
    # My Python app waits for databases to be ready







    To run the entire system:






    docker compose up -d --build
    docker compose exec etl-app python main.py










    Key Lessons & Design Decisions

    Why Two Databases?

    Financial reports PostgreSQL ACID compliance, SQL support
    Mobile app API MongoDB JSON-native, flexible schema
    Complex joins PostgreSQL Relational model excels
    Rapid prototyping MongoDB No schema migrations needed


    Why Python?

    • Pandas: Industry-standard for data manipulation
    • SQLAlchemy: ORM that prevents SQL injection
    • PyMongo: Lightweight MongoDB driver
    • Rich ecosystem: Libraries for everything


    Why Modular Design?

    Think of it like LEGO blocks. Each module is a self-contained piece that:
    • Can be tested independently
    • Can be replaced without breaking others
    • Makes debugging a breeze





    Future Enhancements

    This pipeline is production-ready, but here's what could come next:

    1. Scheduling: Run automatically every hour with Apache Airflow or cron
    2. Message Queues: Use Kafka/RabbitMQ for async processing at scale
    3. Data Validation: Add Great Expectations for data quality checks
    4. Monitoring: Add Prometheus/Grafana for pipeline observability
    5. More Sources: Extend to pull from APIs, S3, or other databases





    Conclusion

    Building this ETL pipeline taught me that good data engineering is invisible. When it works, nobody notices, the reports are accurate, the app loads fast, and decisions get made with confidence.


    But behind that invisibility is careful architecture: modular code, dual-database strategy, containerized deployment, and clean data transformations.


    Whether you're a business analyst who just wants clean data, or an engineer looking to build your own pipeline, I hope this walkthrough demystified the magic behind turning chaotic spreadsheets into business intelligence gold.


    The supermarket never runs out of your favorite snacks because somewhere, a data pipeline is quietly doing its job.


    If you're interested in the code, check out the repository here:GitHub Repo







    More...
Working...