How I Built a Real Time DDoS Detection Engine from Scratch

Collapse
X
 
  • Time
  • Show
Clear All
new posts
  • MyrinNew
    Senior Member
    • Feb 2024
    • 5168

    #1

    How I Built a Real Time DDoS Detection Engine from Scratch

    Imagine you run a cloud storage platform. Thousands of users upload files, share documents, and log in every day. Then one afternoon, traffic suddenly spikes thousands of requests per second hammering your server from a single IP address. Your server slows down. Legitimate users can't log in. You're under attack.


    The traditional answer is Fail2Ban a tool that watches logs and blocks IPs. But what if you had to build that yourself, from first principles? That's exactly what this project is a custom anomaly detection daemon that watches HTTP traffic in real time, learns what "normal" looks like, and automatically blocks attackers via iptables.

    No Fail2Ban. No rate limiting libraries. Just Python, math, and Linux.


    What the System Does

    1. Nginx receives all incoming HTTP requests and writes each one as a JSON log line
    2. The detector daemon tails that log file continuously, line by line
    3. For every request, it asks:"Is this IP behaving abnormally compared to recent history?"
    4. If yes, it adds an iptables DROP ruleto block that IP at the kernel level
    5. It sends a Slack alertso the team knows what happened
    6. After a timeout (10 minutes, 30 minutes, 2 hours, or permanent depending on repeat offenders), it automatically lifts the ban
    7. A live web dashboard shows banned IPs, traffic rates, and system health in real time


    Part 1: Reading the Logs The Monitor


    Before we can detect anything, we need to read the Nginx access log. Nginx writes one line per HTTP request. We configure it to write in JSON format so parsing is clean:






    {
    "source_ip": "1.2.3.4",
    "timestamp": "2025-01-01T12:00:00+00:00",
    "method": "GET",
    "path": "/index.php",
    "status": 200,
    "response_size": 1024
    }







    The monitor runs as a background thread and tails this file continuously. Here's the core idea:






    while True:
    line = file.readline()
    if not line:
    time.sleep(0.1) # nothing new yet, wait a moment
    continue
    entry = parse(line) # turn JSON into a LogEntry object
    queue.put(entry) # hand it off to the detector







    Why a queue? The monitor's job is purely I/O reading lines as fast as they arrive. The detector's job is CPU work running math on each entry. Separating them with a queue means they run independently. If detection is briefly slow, the queue buffers entries and nothing is lost.


    Handling log rotation: Nginx periodically creates a new log file. If we don't handle this, our tail would keep reading the old file and miss all new traffic. We detect rotation by watching the file's inode a unique ID the operating system assigns to each file. If the inode changes, we reopen the file from the beginning.


    Part 2: The Sliding Window Counting Requests Without Gaps


    The most important question is: "How many requests has this IP sent in the last 60 seconds?"


    The naive approach is to keep a counter per IP and reset it every minute. But that creates a blind spot if an attacker sends 1000 requests at 00:59 and 1000 more at 01:01, each resets within its own minute and you never see the true burst.


    The correct approach is a sliding window: track the exact timestamp of every request, and at any moment count only those within the last 60 seconds.


    We use Python's collections.deque for this:






    from collections import deque

    class SlidingWindowCounter:
    def __init__(self, window_seconds=60):
    self.window_seconds = window_seconds
    self._timestamps = deque() # stores arrival times

    def record(self, ts: float):
    self._timestamps.append(ts) # new request arrives at the right end

    def evict_and_count(self, now: float) -> int:
    cutoff = now - self.window_seconds

    # Remove expired entries from the LEFT (oldest end)
    while self._timestamps and self._timestamps[0] cutoff:
    self._timestamps.popleft()

    return len(self._timestamps) # everything left is within the window

    def rate(self, now: float) -> float:
    return self.evict_and_count(now) / self.window_seconds







    Why a deque and not a list?


    A regular Python list is slow at removing from the front it has to shift every element left, which is O(n). A deque (double ended queue) removes from either end in O(1). Since we always append new entries to the right and evict old entries from the left, a deque is the perfect data structure.


    The eviction logic: We don't scan the whole deque looking for expired entries. We only look at the leftmost entry (index 0) the oldest one. If it's older than 60 seconds, we pop it and check the next one. Because entries are always appended in time order, once we find an entry that's within the window, everything to its right is also within the window. We stop there.


    In steady traffic, this evicts 0 or 1 entries per call effectively instant.


    We maintain one counter per IP and one global counter for all traffic combined.


    Part 3: Learning What "Normal" Looks Like The Rolling Baseline


    The sliding window tells us the current rate. But we can't know if that rate is suspicious without knowing what's normal for this server.


    This is the baseline. It answers: "Historically, how many requests per second does this server receive?"


    Building the baseline

    Every second, we take a snapshot of the global window count and store it:






    # maxlen=1800 means we keep 30 minutes of history (30 * 60 = 1800 seconds)
    self._per_second_counts = deque(maxlen=1800)

    # Every second:
    current_count = global_window.evict_and_count(now)
    self._per_second_counts.append((now, current_count))







    The maxlen=1800 is doing something clever: when the deque is full and we append a new entry, Python automatically drops the oldest entry from the left. We get a rolling 30 minute window with zero manual cleanup code.

    Computing mean and standard deviation

    Every 60 seconds we recalculate from the stored history:






    counts = [c for _, c in self._per_second_counts]
    n = len(counts)

    mean = sum(counts) / n
    variance = sum((x - mean) ** 2 for x in counts) / n
    stddev = math.sqrt(variance)

    # Apply floors to prevent false positives during idle periods
    mean = max(mean, 1.0) # assume at least 1 req/s as baseline
    stddev = max(stddev, 0.5) # assume at least 0.5 req/s variation







    Why floor values? If the server is idle at 3am with zero traffic, mean=0 and stddev=0. Then a single request creates an infinite z score and triggers a false alarm. The floors prevent this they say "even in the quietest period, assume some baseline activity."


    Per-hour slots


    Traffic at 3am looks different from traffic at 3pm. We store separate baseline stats per hour of the day






    current_hour = datetime.now().hour
    self._hour_stats[current_hour] = BaselineStats(mean, stddev, ...)







    When looking up the baseline, we prefer the current hour's stats if they have enough samples. This means the detector naturally adapts to day/night traffic patterns without any manual configuration.


    Part 4: Making the Decision Z Score and the 5x Rule
    • The current rate for an IP (from the sliding window)
    • The baseline mean and stddev (from the rolling window)


    We combine these into a z score:






    z = (current_rate - baseline_mean) / baseline_stddev







    The z score measures how many standard deviations above normal the current rate is. A z score of 3.0 means the rate is 3 standard deviations above the mean statistically, this happens by chance less than 0.3% of the time under normal conditions.






    def _check_ip(self, ip, baseline, now):
    ip_rate = self.tracker.get_ip_rate(ip)
    zscore = (ip_rate - baseline.mean) / baseline.stddev

    if zscore > 3.0:
    return AnomalyEvent(condition=f"z-score {zscore:.2f} > 3.0", ...)

    if ip_rate > baseline.mean * 5.0:
    return AnomalyEvent(condition=f"rate {ip_rate:.2f}/s > 5x baseline", ...)







    Why two rules? They catch different scenarios:
    • Z score catches relative anomalies. If baseline is 10 req/s and someone hits 40 req/s, z score fires because that's unusual relative to history.
    • 5x rule catches absolute spikes even when the baseline is tiny. If baseline is 0.1 req/s (idle server) and someone hits 0.8 req/s, the z-score might not fire (stddev is also tiny), but 8x > 5x catches it immediately.


    Error surge tightening


    There's a sneaky attack pattern: low and slow scanning. An attacker sends just a few requests per second below detection thresholds but most of them return 404 errors because they're probing for vulnerabilities (/wp-admin, /.env, /phpMyAdmin, etc.).

    We detect this separately: if an IP's 4xx/5xx error rate is 3x higher than normal, we tighten the detection thresholds by 30%:






    if error_surge_detected:
    zscore_threshold = 3.0 * 0.7 # → 2.1 (easier to trigger)
    rate_multiplier = 5.0 * 0.7 # → 3.5 (easier to trigger)







    This catches the scanner without blocking every IP that occasionally gets a 404.


    Part 5: Blocking the IP with iptables


    When an anomaly fires, we need to block the IP immediately. We use iptables the Linux kernel's built-in firewall.






    import subprocess

    def block_ip(ip: str):
    subprocess.run([
    "sudo", "iptables",
    "-I", "INPUT", "1", # INSERT at position 1 (top of the chain)
    "-s", ip, # source IP to match
    "-j", "DROP" # silently discard matching packets
    ])







    Breaking this down:
    • I INPUT 1 inserts our rule at the top of the INPUT chain, so it's checked before any other rules. The attacker's packets are dropped immediately.
    • j DROP silently discard. We don't send any response back (as opposed to -j REJECT which sends an ICMP error). This is intentional: the attacker gets no feedback that they're blocked.


    To verify blocks are active, you can run:






    sudo iptables -L INPUT -n --line-numbers







    The backoff schedule escalating punishment

    Not all attacks are equal. A first time offender might get an automatic unban after 10 minutes. A repeat offender escalates through longer bans


    We track how many times each IP has been banned across its entire history. When re banning, we move to the next duration:






    ban_durations = [600, 1800, 7200, -1] # seconds; -1 = permanent

    count = self._ban_history.get(ip, 0) # how many times banned before
    duration = ban_durations[min(count, len(ban_durations) - 1)]







    Auto-unban runs in a background thread that wakes every 30 seconds, checks for expired bans, removes the iptables rule, and sends a Slack notification.


    Part 6: The Audit Log

    Every significant event ban, unban, baseline recalculation is written to a structured log file:






    [2025-01-01T12:00:00+00:00] BAN 1.2.3.4 | IP z-score 4.5 > 3.0 | rate=42.300/s | baseline=5.000/0.500 | 600s
    [2025-01-01T12:10:00+00:00] UNBAN 1.2.3.4 | expired | rate=0.000/s | baseline=5.000/0.500 |
    [2025-01-01T12:01:00+00:00] BASELINE_RECALC GLOBAL | samples=1800 hour=12 | rate=5.200/s | baseline=5.100/0.480 |







    This gives you a full timeline of what happened and why invaluable for post incident analysis.


    Part 7: The Live Dashboard

    The dashboard is served by FastAPI and auto refreshes every 3 seconds. It shows:
    • Current global req/s vs baseline mean
    • Number of currently banned IPs
    • CPU and memory usage
    • Daemon uptime
    • Table of all active bans with time remaining
    • Top 10 source IPs by request rate


    The frontend is plain JavaScript polling /api/metrics. No React, no build step just a fetch() on a timer.


    Putting It All Together

    The main loop is intentionally simple:






    while running:
    tracker.maybe_recalculate() # recalc baseline every 60s

    # Drain the log queue in batches
    for _ in range(500):
    entry = queue.get_nowait()
    if blocker.is_banned(entry.source_ip):
    continue # already blocked, skip

    tracker.record(entry) # update sliding windows
    event = detector.evaluate(entry) # check for anomaly

    if event and event.kind == PER_IP:
    record = blocker.ban(event.ip, event.condition)
    notifier.send_ban(event, record)
    audit_log.write(...)

    elif event and event.kind == GLOBAL:
    notifier.send_global_alert(event)







    Four threads run concurrently:

    Main thread: the detection loop above

    LogMonitor thread: tails the log file, feeds the queue

    Unbanner thread: wakes every 30s to release expired bans

    Dashboard thread: serves the FastAPI web UI


    Key Lessons

    1. Use the right data structure. A dequemakes the sliding window O(1). The wrong choice (a list, a dict of minute buckets) would have made it O(n) or introduced measurement gaps.
    2. Don't hardcode thresholds. Every threshold in this system lives in config.yaml. Tuning detection sensitivity is a matter of editing one file, not hunting through code.
    3. The baseline needs a warmup period. The system needs 30 minutes of real traffic before the baseline is trustworthy. Floor values prevent false alarms during warmup.
    4. Two detection rules are better than one. Z score and the 5x multiplier cover different attack shapes. Neither alone is sufficient.
    5. Be careful with iptables in Docker. The detector container needs cap_add: [NET_ADMIN] and must run as root. Rules applied inside the container affect the host's iptables chain which is exactly what we want, but worth understanding before you do it.




    Internet → Nginx (JSON logs) → shared Docker volume

    Detection daemon
    ├── monitor.py (tail logs)
    ├── baseline.py (rolling stats)
    ├── detector.py (z-score logic)
    ├── blocker.py (iptables)
    ├── unbanner.py (backoffreleases)
    ├── notifier.py (Slack)
    └── dashboard.py (FastAPI UI)









    More...
Working...