I created a solution for AWS called Anomaly Guardian

Collapse
X
 
  • Time
  • Show
Clear All
new posts
  • MyrinNew
    Senior Member
    • Feb 2024
    • 5175

    #1

    I created a solution for AWS called Anomaly Guardian




    Anomaly Guardian – A Real-Time Anomaly Detection and Response System

    What is this solution?

    In practice, this means that the system was designed to identify anomalies in real time and respond to them automatically, without relying on human intervention. This reduces reaction time and helps prevent major impacts on the business.


    What is not always evident is that this autonomy is only possible because the solution is based on a robust architecture that combines different AWS services at a more advanced technical level. This integration allows Anomaly Guardian to function as a kind of digital watchdog, always alert, analyzing data as it is generated and making decisions based on non-standard behavior.


    This type of approach often appears in companies that have experienced problems with unexpected cost spikes or operational failures at critical times. When these areas — monitoring, automation, and response — work together, the impact tends to be clearer: fewer surprises, more control, and an engineering team with more time to focus on what matters.


    What makes this solution different in practice

    • Simultaneous multi-pronged analysis – User behavior, transactions, location, and other signals are evaluated together to identify relevant deviations.
    • Automatic response to known incidents – If something has already been mapped as a risk, the system resolves it before anyone even notices.
    • Relationships that make sense – When an anomaly arises, the correlation engine cross-references events to understand if there is a common cause or a larger failure behind it.
    • Ability to predict and act before the problem arises – Based on what has happened before, the system scales or adjusts resources to avoid bottlenecks.
    • Designed for multi-client environments – Each company operates independently, with total security and privacy within the same structure.





    Solution Architecture

    1. Data Ingestion Layer





    [API Gateway (REST API)] --> [Kinesis Data Streams] --> [Lambda Event Processor]







    Services used:
    • Amazon API Gateway
      Acts as the main RESTful entry point, collecting events from distributed systems or user interactions.
    • Amazon Kinesis Data Streams
      Handles real-time event streaming, ensuring events are reliably captured and queued.
    • AWS Lambda
      Performs the first layer of logic filtering, enriching, or routing events as needed.


    Technical characteristics:
    • Ingestion rate above 10,000 events/second
    • Latency below 100ms for initial processing
    • Auto-scaling behavior according to workload volume





    2. ML and Analytics Layer





    [Kinesis Analytics] --> [SageMaker Endpoint] --> [Elasticsearch (Indexing)]







    Services used:
    • Amazon Kinesis Analytics
      Performs SQL-based real-time analysis to identify behavioral patterns.
    • Amazon SageMaker
      Hosts machine learning models that classify anomalies on the fly.
    • Amazon Lookout for Metrics
      Monitors key business metrics and detects anomalies automatically — no manual rules required.
    • Amazon Elasticsearch (OpenSearch)
      Indexes processed events to enable complex querying and pattern detection.


    Algorithms implemented:
    • Isolation Forest – Detects outliers in high-dimensional data.
    • Statistical Process Control (SPC) – Used to monitor and manage process variations.
    • Deep Learning Autoencoders – Ideal for identifying temporal anomalies in sequences.
    • Correlation Analysis – Finds relationships between multiple variables to uncover contextual anomalies.





    3. Intelligence and Response Layer





    [Step Functions (Orchestration)] --> [Security Actions (Auto-remediation)] --> [Incident Response]







    Services used:
    • AWS Step Functions
      Coordinates complex workflows based on rule evaluation and event triggers.
    • Amazon SNS
      Publishes alerts across multiple channels (email, SMS, etc.), keeping teams informed.
    • AWS Lambda
      Executes targeted response actions based on incident type or rule match.
    • Amazon DynamoDB
      Stores event history, decision trees, and remediation rules in a scalable way.


    Types of Detected Anomalies

    The system is designed to recognize different kinds of deviations in user behavior, transactions, and infrastructure. Each type of anomaly triggers a specific detection method and response path.





    1. Velocity Anomalies

    • Detection: Unusual number of requests from the same user or IP address
    • Threshold: More than 100 requests within 5 minutes
    • Response: Automatic rate limiting to mitigate abuse





    2. Value Anomalies

    • Detection: Transactions involving unusually large or inconsistent amounts
    • Algorithm: Detection based on moving standard deviation
    • Response: Manual review triggered for any transaction above $5,000





    3. Geolocation Anomalies

    • Detection: Sudden changes in user’s geographic location
    • Algorithm: Time-series analysis to track location shifts
    • Response: Extra layer of security validation





    4. Behavioral Anomalies

    • Detection: Unexpected or erratic browsing patterns
    • Algorithm: Session-based Machine Learning model
    • Response: Intensive monitoring mode enabled





    5. System Anomalies

    • Detection: Drops in performance or availability
    • Algorithm: Statistical Process Control (SPC)
    • Response: Auto-scaling and workload rebalancing


    Configuration and Deployment

    Setting up the Anomaly Guardian is straightforward, as long as the basic environment and permissions are in place. Below is a quick guide to ensure everything runs smoothly from the start.


    Prerequisites





    # AWS CLI v2+
    aws --version

    # Python 3.9+
    python3 --version

    # Required IAM Permissions:
    # (All services need Full Access for deployment purposes)
    - CloudFormation
    - SageMaker
    - Kinesis
    - Lambda
    - DynamoDB
    - Elasticsearch
    - Step Functions







    Automated Deployment





    # Clone the templates
    git clone
    cd anomaly-guardian

    # Set up environment variables
    export PROJECT_NAME="anomaly-guardian"
    export ENVIRONMENT="prod"
    export AWS_REGION="us-east-1"
    export EMAIL_ALERT="admin@company.com"

    # Run the deployment
    chmod +x deploy.sh
    ./deploy.sh deploy`







    Manual Deployment

    Base Infrastructure Deployment





    aws cloudformation deploy \
    --template-file anomaly-guardian-base.yaml \
    --stack-name anomaly-guardian-prod-base \
    --parameter-overrides \
    Environment=prod \
    ProjectName=anomaly-guardian \
    AlertEmail=admin@company.com \
    --capabilities CAPABILITY_NAMED_IAM







    Advanced Components Deployment





    aws cloudformation deploy \
    --template-file anomaly-guardian-advanced.yaml \
    --stack-name anomaly-guardian-prod-advanced \
    --parameter-overrides \
    BaseStackName=anomaly-guardian-prod-base \
    Environment=prod \
    ProjectName=anomaly-guardian \
    --capabilities CAPABILITY_NAMED_IAM







    Monitoring and Observability

    Business Metrics

    • Anomalies Detected/Hour:
      Measures how many anomalies are identified every hour. Useful for spotting spikes or patterns in unusual behavior.
    • False Positive Rate:
      Helps evaluate model precision. A high rate might indicate overly aggressive detection rules.
    • Mean Time to Detection (MTTD):
      Tracks how long it typically takes to detect an anomaly after it occurs.
    • Mean Time to Response (MTTR):
      Measures the average time between detection and mitigation or escalation.


    Technical Metrics

    • API Gateway Latency:
      Observes the responsiveness of the ingestion endpoint.
    • Lambda Duration:
      Indicates how long your functions are taking to execute. Spikes may hint at processing bottlenecks.
    • Kinesis Incoming/Outgoing Records:
      Tracks throughput for streaming data is important for ensuring no event backlog.
    • SageMaker Endpoint Utilization:
      Reflects how actively the ML model is being used for inference.


    CloudWatch Dashboards

    To make monitoring more accessible, the system automatically generates a custom CloudWatch dashboard with widgets that group key metrics by function.






    {
    "widgets": [
    {
    "title": "Anomaly Detection Overview",
    "metrics": ["AnomalyDetected", "AnomalyScore", "ResponseTime"]
    },
    {
    "title": "System Health",
    "metrics": ["Lambda Errors", "Kinesis Throughput", "SageMaker Utilization"]
    },
    {
    "title": "Security Actions",
    "metrics": ["Users Blocked", "Rate Limited", "Incidents Created"]
    }
    ]
    }







    Configured alerts

    The system continuously monitors critical thresholds to trigger automated alerts. These alerts are divided into two levels: Critical and Warning, based on severity and impact.


    Critical alerts

    • Anomaly Score > 0.9 for 3 consecutive periods
      Indicates highly suspicious behavior sustained over time.
    • Lambda Error Rate > 5%
      A spike in function errors, usually signaling a misconfiguration or downstream issue.
    • SageMaker Latency > 5 seconds
      When model response time degrades significantly, this alert is triggered to prevent detection delays.


    Warning alerts

    • Anomaly Score > 0.7 for 5 periods:
      Less severe but still notable often an early sign of unusual behavior emerging.
    • Kinesis Utilization > 80%:
      May indicate stream saturation; important to track before hitting capacity limits.
    • Event Correlation Failure:
      Triggered when expected correlations across dimensions break, which might signal fragmented or inconsistent data.


    Security

    Security is embedded at every layer of the architecture from encrypted communication channels to granular access policies and network isolation.


    Cryptography

    In transit:
    • TLS 1.2+ enforced across all communications
    • SSL/TLS certificates managed through AWS Certificate Manager


    At Rest:
    • Kinesis: Encrypted with AWS-managed KMS keys
    • DynamoDB: Server-side encryption enabled
    • S3: AES-256 encryption for stored objects
    • Elasticsearch: Encryption at rest enabled


    Access control

    IAM Roles and Policies:






    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Action": [
    "kinesis:PutRecord",
    "kinesis:PutRecords"
    ],
    "Resource": "arn:aws:kinesis:*:*:stream/anomaly-guardian-*"
    }
    ]
    }







    VPC and Network Security

    • Critical resources are deployed in private subnets
    • Security Groups follow least privilege access rules
    • Network ACLs (NACLs) are used for additional subnet-level control


    Audit

    CloudTrail integration

    • Logs all administrative actions performed within the environment.
    • Integrated with AWS Config for configuration tracking and compliance auditing.
    • Log retention configured for 7 years, supporting long-term audit requirements.


    GDPR

    • Pseudonymization of personal data where applicable.
    • Right to be forgotten implementation included for sensitive datasets.
    • Clearly defined data retention policies in line with data protection laws.


    Performance and Scalability

    Understanding how the system performs under load and how it scales is key to ensuring it can handle real-world demands without degradation.


    Performance Benchmarks

    Maximum Throughput
    • API Gateway: Up to 10,000 RPS (with caching enabled)
    • Kinesis: Scales with 1,000 shards (1MB/s per shard)
    • Lambda: Supports 1,000 concurrent executions
    • SageMaker: Up to 1,000 inferences per second


    Typical Latency
    • P50 (median):
    • P95:
    • P99:


    Auto-Scaling Configuration

    Horizontal Scaling






    SageMaker:
    MinInstances: 1
    MaxInstances: 10
    TargetUtilization: 70%

    Lambda:
    ReservedConcurrency: 100
    BurstConcurrency: 1000

    Kinesis:
    AutoScaling: Enabled
    TargetUtilization: 70%







    Vertical Scaling
    • SageMaker endpoints scale automatically based on inference load.
    • DynamoDB uses on-demand billing, scaling read/write capacity as needed.
    • Elasticsearch (OpenSearch) includes auto-scaling for storage.


    Maintenance and Operations

    Operational excellence isn’t just about automation it also involves structured routines and a clear playbook for common issues.


    Daily

    • Health metrics verification
    • Review of critical anomalies
    • Configuration backup procedures


    Weekly

    • ML model updates
    • Review of false positives
    • Threshold optimization


    Monthly

    • Trend analysis and anomaly patterns
    • Capacity planning
    • Disaster recovery testing


    Troubleshooting common issues

    High false positive rate






    # Check current thresholds
    aws dynamodb scan --table-name anomaly-rules

    # Adjust sensitivity
    aws dynamodb update-item \
    --table-name anomaly-rules \
    --key '{"ruleId": {"S": "velocity-check"}}' \
    --update-expression "SET threshold = :val" \
    --expression-attribute-values '{":val": {"N": "150"}}'







    SageMaker Endpoint Down






    # Check current endpoint status
    aws sagemaker describe-endpoint --endpoint-name anomaly-detection-endpoint

    # Restart if needed
    aws sagemaker update-endpoint \
    --endpoint-name anomaly-detection-endpoint \
    --endpoint-config-name new-config







    Kinesis Stream Throttling






    # Check stream metrics
    aws cloudwatch get-metric-statistics \
    --namespace AWS/Kinesis \
    --metric-name WriteProvisionedThroughputExceeded \
    --dimensions Name=StreamName,Value=event-stream

    # Increase shard count if needed
    aws kinesis update-shard-count \
    --stream-name event-stream \
    --target-shard-count 16 \
    --scaling-type UNIFORM_SCALING







    Backup & Disaster Recovery

    Backup Strategy

    • DynamoDB: Point-in-time recovery (PITR) enabled
    • S3: Versioning and cross-region replication configured
    • Code: Managed via Git with multiple remote repositories
    • Configuration: Daily backups of Parameter Store values


    RTO / RPO Targets

    • RTO (Recovery Time Objective): Less than 4 hours
    • RPO (Recovery Point Objective): Less than 15 minutes


    Disaster Recovery (DR) procedures

    1. Activate secondary region
    2. Restore DynamoDB backups
    3. Redeploy CloudFormation stacks
    4. Redirect traffic to the failover stack
    5. Perform full system validation


    API Reference

    Event Ingestion

    POST /events

    Main endpoint used for real-time event ingestion into the system.


    Headers






    Content-Type: application/json
    Authorization: AWS4-HMAC-SHA256 ...







    Request Body






    {
    "eventId": "unique-event-id",
    "userId": "user-123",
    "timestamp": "2025-07-24T10:30:00Z",
    "eventType": "transaction",
    "transactionAmount": 250.00,
    "requestsPerSecond": 5,
    "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
    "geoLocation": "US-East-1",
    "sessionLength": 1800,
    "ipAddress": "192.168.1.100",
    "metadata": {
    "productId": "prod-456",
    "category": "electronics",
    "paymentMethod": "credit_card"
    }
    }







    Response






    {
    "status": "success",
    "message": "Event received",
    "eventId": "unique-event-id",
    "timestamp": "2025-07-24T10:30:01Z"
    }







    Error Responses






    {
    "status": "error",
    "code": "INVALID_PAYLOAD",
    "message": "Missing required field: userId"
    }







    Anomaly Query

    GET /anomalies

    Used to retrieve detected anomalies using optional filters.


    Query Parameters
    • severity: CRITICAL, HIGH, MEDIUM, LOW
    • fromDate: ISO 8601 datetime
    • toDate: ISO 8601 datetime
    • userId: ID of a specific user
    • limit: Max number of results (default: 100)


    Example Request






    curl -X GET "https://api.anomaly-guardian.com/anomalies?severity=HIGH&limit=50" \
    -H "Authorization: Bearer "







    Response Example






    {
    "anomalies": [
    {
    "anomalyId": "anom-789",
    "severity": "HIGH",
    "anomalyScore": 0.85,
    "detectedAt": "2025-07-24T10:25:00Z",
    "userId": "user-123",
    "anomalyType": "velocity",
    "description": "Unusual request rate detected",
    "originalEvent": { ... },
    "actionsPerformed": ["rate_limit", "enhanced_monitoring"]
    }
    ],
    "pagination": {
    "nextToken": "eyJ0aW1lc3RhbXAiOiIyMDI1...",
    "hasMore": true
    }
    }







    Rule management

    Create Rule

    POST /rules

    Request Body






    {
    "ruleName": "high-velocity-check",
    "ruleType": "velocity",
    "description": "Detect high request velocity",
    "conditions": {
    "metric": "requestsPerSecond",
    "operator": "greater_than",
    "threshold": 100,
    "timeWindow": "5m"
    },
    "actions": [
    {
    "type": "rate_limit",
    "parameters": {
    "maxRequestsPerMinute": 10,
    "duration": "1h"
    }
    },
    {
    "type": "notify",
    "parameters": {
    "channels": ["email", "slack"],
    "severity": "HIGH"
    }
    }
    ],
    "enabled": true
    }







    Update rule

    PUT /rules/{ruleId}

    Used to update an existing anomaly detection rule.


    Delete rule

    DELETE /rules/{ruleId}

    Soft delete an existing rule, the rule is deactivated but remains stored.

    Webhook Configuration

    Register Webhook

    POST /webhooks


    Request Body






    {
    "url": "https://your-system.com/webhooks/anomaly",
    "events": ["anomaly.detected", "anomaly.resolved"],
    "severityFilter": ["CRITICAL", "HIGH"],
    "secret": "webhook-secret-key",
    "retryPolicy": {
    "maxRetries": 3,
    "backoffMultiplier": 2
    }
    }







    Advanced Use Cases

    1. E-commerce Fraud Detection
      Scenario:


    An online store with over 1 million transactions per day needs to detect fraudulent behavior in real time.


    Specific configuration:






    Anomaly Rules:
    - High-Value Transactions: >$5,000
    - Velocity Attacks: >50 attempts/minute
    - Geographic Anomalies: Multiple locations within
    - Device Fingerprinting: Suspicious device changes

    Response Actions:
    - Automatic Transaction Hold
    - Multi-factor Authentication Request
    - Manual Review Queue
    - Customer Communication







    Success metrics:
    • 95% reduction in undetected fraud attempts
    • Detection time under 2 seconds
    • False positive rate below 0.1%

    1. API Security Monitoring
      Scenario:


    Protect critical APIs against DDoS attacks and abusive behavior.


    Specific configuration:






    Protection Layers:
    - Rate Limiting: By IP, user, and endpoint
    - Behavior Analysis: Unusual usage patterns
    - Bot Detection: Based on user-agent and behavioral fingerprinting
    - Geo-blocking: Restrict access from high-risk countries

    Auto-Response:
    - IP Blacklisting (temporary)
    - CAPTCHA Challenge
    - Traffic Throttling
    - Incident Creation






    1. IoT Device Monitoring
      Scenario:


    Monitoring of 100k+ IoT devices to detect anomalies, failures, and potential attacks.


    Specific Configuration:






    Sensor Metrics:
    - Temperature Anomalies
    - Connectivity Patterns
    - Power Consumption
    - Data Transmission Rates

    ML Models:
    - Time Series Forecasting
    - Clustering for Device Behavior
    - Outlier Detection
    - Predictive Maintenance







    Machine Learning details

    This section outlines the models used for anomaly detection, including feature selection and detection logic.

    1. Isolation Forest




    from sklearn.ensemble import IsolationForest

    model = IsolationForest(
    contamination=0.1, # 10% expected anomalies
    random_state=42,
    n_estimators=100,
    max_samples='auto'
    )

    # Selected Features
    features = [
    'transaction_amount_normalized',
    'requests_per_second',
    'session_length',
    'hour_of_day',
    'day_of_week',
    'geo_distance_from_usual',
    'device_change_indicator'
    ]






    1. LSTM Autoencoder (For Time Series)




    import tensorflow as tf

    model = tf.keras.Sequential([
    tf.keras.layers.LSTM(50, return_sequences=True, input_shape=(timesteps, features)),
    tf.keras.layers.LSTM(30, return_sequences=False),
    tf.keras.layers.RepeatVector(timesteps),
    tf.keras.layers.LSTM(30, return_sequences=True),
    tf.keras.layers.LSTM(50, return_sequences=True),
    tf.keras.layers.TimeDistributed(tf.keras.layers.De nse(features))
    ])

    # Anomaly is detected when reconstruction error > threshold
    threshold = np.percentile(reconstruction_errors, 95)






    1. Statistical Process Control (SPC)




    import numpy as np
    from scipy import stats

    def calculate_control_limits(data, window_size=30):
    """Calculates dynamic control limits based on rolling stats"""
    rolling_mean = data.rolling(window=window_size).mean()
    rolling_std = data.rolling(window=window_size).std()

    upper_limit = rolling_mean + 3 * rolling_std
    lower_limit = rolling_mean - 3 * rolling_std

    return upper_limit, lower_limit

    def detect_anomaly(value, upper_limit, lower_limit):
    """Detects anomaly if value is outside control limits"""
    return value > upper_limit or value lower_limit







    Feature engineering pipeline

    Applied Transformations

    1. Normalization: Z-score normalization for numerical features.
    2. Categorical Encoding: One-hot encoding for categorical variables.
    3. Time Features: Extracted features such as hour, day of week, and month.
    4. Aggregations: Moving averages, percentiles, counters.
    5. Geo Features: Distance from typical geolocation patterns.


    Feature Selection






    from sklearn.feature_selection import SelectKBest, f_classif

    # Select top 20 most relevant features
    selector = SelectKBest(score_func=f_classif, k=20)
    X_selected = selector.fit_transform(X, y)

    # Automatically selected features
    selected_features = selector.get_support(indices=True)







    Model Training pipeline

    Data Pipeline





    Steps:
    1. Data Extraction:
    - Source: Kinesis Data Streams
    - Format: JSON events
    - Volume: 1M+ events/day

    2. Data Preprocessing:
    - Cleaning: Remove duplicates, handle missing values
    - Transformation: Apply feature engineering
    - Validation: Data quality checks

    3. Feature Store:
    - Storage: SageMaker Feature Store
    - Serving: Real-time and batch
    - Monitoring: Feature drift detection

    4. Model Training:
    - Framework: SageMaker Training Jobs
    - Algorithms: Multiple models ensemble
    - Validation: Cross-validation + holdout

    5. Model Evaluation:
    - Metrics: Precision, Recall, F1, AUC
    - Business Metrics: Cost of false positives/negatives
    - A/B Testing: Champion/Challenger comparison

    6. Model Deployment:
    - Staging: Canary deployment
    - Production: Blue/green deployment
    - Monitoring: Real-time performance tracking







    Continuous Learning

    Model Retraining Strategy

    • Scheduled Retraining: Weekly retraining with newly collected data.
    • Performance-Based: Triggered when evaluation metrics drop below acceptable thresholds.
    • Concept Drift Detection: Ongoing monitoring of data distribution and feature drift in real time.


    Feedback Loop

    Incorporating analyst feedback into model updates allows the system to continuously learn from real-world validation, improving accuracy over time.






    def update_model_with_feedback(feedback_data):
    """Update model with analyst feedback"""

    # Combine analyst labels with original features
    labeled_data = combine_feedback_with_features(feedback_data)

    # Incremental model retraining
    model.partial_fit(labeled_data.features, labeled_data.labels)

    # Validate new performance
    new_performance = evaluate_model(model, validation_set)

    if new_performance > current_performance:
    deploy_model(model)
    log_model_update(new_performance)







    Reports & Analytics

    Executive Dashboard


    Key KPIs:
    • Security Posture Score: 0–100 based on multiple metrics.
    • Threat Detection Rate: % of threats detected vs. total.
    • False Positive Rate: % of alerts that were false positives.
    • Mean Time to Detection: Average time to detect anomalies.


    Visualizations:






    {
    "widgets": [
    {
    "type": "scorecard",
    "title": "Security Score",
    "value": 94,
    "trend": "+2% from last week"
    },
    {
    "type": "timeseries",
    "title": "Anomalies Over Time",
    "data": "hourly_anomaly_counts"
    },
    {
    "type": "heatmap",
    "title": "Anomaly Types by Hour",
    "data": "anomaly_type_hour_matrix"
    },
    {
    "type": "geographic",
    "title": "Geographic Distribution",
    "data": "anomalies_by_location"
    }
    ]
    }







    Automated reports

    Daily Report:






    def generate_daily_report():
    report = {
    "date": today(),
    "summary": {
    "total_events_processed": get_event_count(),
    "anomalies_detected": get_anomaly_count(),
    "actions_performed": get_action_count(),
    "system_performance": get_performance_metrics()
    },
    "top_anomalies": get_top_anomalies(limit=10),
    "system_health": get_health_status(),
    "recommendations": generate_recommendations()
    }

    send_report(report, recipients=["security-team@company.com"])
    store_report(report, s3_bucket="reports-bucket")







    Weekly report:
    • Trend analysis
    • ML model performance
    • Threat evolution
    • Adjustment recommendations


    Monthly report:
    • Solution ROI
    • Industry benchmarks
    • Capacity planning
    • Improvement roadmap


    CI/CD Pipeline

    Infrastructure as Code


    GitOps Workflow:





    # .github/workflows/deploy.yml
    name: Deploy Anomaly Guardian

    on:
    push:
    branches: [main]
    pull_request:
    branches: [main]

    jobs:

    validate:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    - name: Validate CloudFormation
    run: |
    aws cloudformation validate-template \
    --template-body file://anomaly-guardian-base.yaml
    aws cloudformation validate-template \
    --template-body file://anomaly-guardian-advanced.yaml

    test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    - name: Run Tests
    run: |
    python -m pytest tests/
    python -m pytest integration_tests/

    security-scan:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    - name: Security Scan
    run: |
    checkov -f anomaly-guardian-base.yaml
    cfn-lint anomaly-guardian-advanced.yaml

    deploy-staging:
    needs: [validate, test, security-scan]
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
    - name: Deploy to Staging
    run: ./deploy.sh deploy
    env:
    ENVIRONMENT: staging

    integration-tests:
    needs: deploy-staging
    runs-on: ubuntu-latest
    steps:
    - name: Run Integration Tests
    run: ./deploy.sh test
    env:
    ENVIRONMENT: staging

    deploy-production:
    needs: integration-tests
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
    - name: Deploy to Production
    run: ./deploy.sh deploy
    env:
    ENVIRONMENT: prod







    Testing Strategy

    Unit Tests






    import pytest
    from anomaly_detector import AnomalyDetector

    class TestAnomalyDetection:
    def setup_method(self):
    self.detector = AnomalyDetector()

    def test_velocity_anomaly_detection(self):
    event = {
    "userId": "test-user",
    "requestsPerSecond": 150
    }
    result = self.detector.detect(event)
    assert result.is_anomaly is True
    assert result.anomaly_type == "velocity"

    def test_normal_behavior(self):
    event = {
    "userId": "test-user",
    "requestsPerSecond": 10
    }
    result = self.detector.detect(event)
    assert result.is_anomaly is False







    Integration Tests






    import boto3
    import json
    import time

    def test_end_to_end_anomaly_detection():
    # Send an anomalous event
    kinesis = boto3.client("kinesis")
    event = create_anomalous_event()

    kinesis.put_record(
    StreamName="anomaly-guardian-test-event-stream",
    Data=json.dumps(event),
    PartitionKey=event["userId"]
    )

    # Wait for processing
    time.sleep(90)

    # Verify if anomaly was stored
    dynamodb = boto3.resource("dynamodb")
    table = dynamodb.Table("anomaly-guardian-test-anomaly-history")

    response = table.get_item(
    Key={"anomalyId": event["eventId"]}
    )

    assert "Item" in response
    assert response["Item"]["severity"] == "HIGH"







    Performance Tests






    import concurrent.futures
    import time

    def test_throughput():
    """Test maximum system throughput"""

    def send_event():
    # Send event via API Gateway
    response = requests.post(API_ENDPOINT, json=test_event)
    return response.status_code == 200

    # Run test with 1000 concurrent events
    with concurrent.futures.ThreadPoolExecutor(max_workers= 1000) as executor:
    futures = [executor.submit(send_event) for _ in range(1000)]
    results = [r.result() for r in futures]

    success_rate = sum(results) / len(results)
    assert success_rate >= 0.99 # 99% success rate







    Cost Estimation - Production Environment (2M events/day)

    1. Compute


      AWS Lambda: Responsible for processing incoming events. Estimated cost is $45/month, considering invocation volume and average execution time.


      SageMaker Endpoint: Hosts real-time ML inference. Due to the need for a persistent endpoint, the cost is relatively higher, around $180/month.


      Step Functions: Manages orchestration between different stages of the processing flow. The estimated cost is $25/month.
    2. Storage


      DynamoDB: Used to store anomaly history and related metadata. With high read/write throughput, cost is estimated at $120/month.


      S3 (Simple Storage Service): Stores logs, backups, and reports. Assuming 1 TB of monthly traffic, storage costs are around $35/month.


      Elasticsearch (OpenSearch): Powers fast search and analytics on event data. With high availability enabled, estimated cost is $200/month.
    3. Streaming


      Kinesis Data Streams: Handles real-time event ingestion. Based on provisioned throughput, the cost is $150/month.


      Kinesis Data Analytics: Runs real-time SQL queries on incoming streams. This service contributes approximately $100/month.
    4. Networking


      API Gateway: Serves HTTP endpoints to receive and route events. For high-volume access, the estimated monthly cost is $35.


      Data Transfer: Covers outbound data (egress) from AWS. Estimated cost is $65/month, assuming standard internet traffic volumes.


    Total Monthly Estimate: US$955/month


    Cost Optimization Strategies:

    1. Reserved Instances: 40% savings on SageMaker




    # Purchase Reserved Instance for SageMaker
    aws sagemaker put-reserved-capacity \
    --reserved-capacity-offering-id "offering-123" \
    --instance-count 2






    1. DynamoDB On-Demand: Savings for variable workloads




    BillingMode: PAY_PER_REQUEST # vs PROVISIONED






    1. S3 Intelligent Tiering: Automatic storage cost savings




    StorageClass: INTELLIGENT_TIERING






    1. Lambda Provisioned Concurrency: Optimization for cold starts




    ProvisionedConcurrency: 10 # For critical functions only







    Cost Monitoring

    Budget alerts





    MonthlyBudget:
    Amount: 1000
    Unit: USD
    Alerts:
    - Threshold: 80%
    Type: ACTUAL
    - Threshold: 100%
    Type: FORECASTED







    Cost Anomaly Detection:





    def detect_cost_anomalies():
    """Detect anomalies in AWS costs"""
    ce = boto3.client("ce")

    response = ce.get_anomalies(
    DateInterval={
    "StartDate": "2025-07-01",
    "EndDate": "2025-07-24"
    }
    )

    for anomaly in response["Anomalies"]:
    if anomaly["Impact"]["TotalImpact"] > 100: # Trigger alert
    send_cost_alert(anomaly)







    Advanced Security Features

    Encryption Everywhere

    Data Classification:






    Sensitive Data:
    - User IDs: Pseudonymized
    - IP Addresses: Hashed
    - Transaction Amounts: Encrypted
    - Geographic Data: Generalized

    Public Data:
    - Aggregate Statistics
    - System Performance Metrics
    - Non-PII Event Metadata







    Key Management:






    KMS Keys:
    Application Key:
    Description: "Anomaly Guardian Application Encryption"
    KeyRotation: Enabled
    Aliases: ["alias/anomaly-guardian-app"]

    Database Key:
    Description: "DynamoDB Encryption Key"
    KeyRotation: Enabled
    Aliases: ["alias/anomaly-guardian-db"]







    Network Security

    VPC Configuration:






    Network Architecture:
    VPC: 10.0.0.0/16

    Public Subnets:
    - 10.0.101.0/24 (AZ-1)
    - 10.0.102.0/24 (AZ-2)

    Private Subnets:
    - 10.0.1.0/24 (AZ-1)
    - 10.0.2.0/24 (AZ-2)

    Database Subnets:
    - 10.0.201.0/24 (AZ-1)
    - 10.0.202.0/24 (AZ-2)







    Security Groups:






    Lambda Security Group:
    Ingress:
    - HTTPS (443) from anywhere
    Egress:
    - DynamoDB VPC endpoint
    - Elasticsearch cluster

    API Gateway Security Group:
    Ingress:
    - HTTPS (443) from anywhere
    Egress: None (managed service)

    Elasticsearch Security Group:
    Ingress:
    - Port 443 from Lambda SG
    - Port 9200 from Lambda SG
    Egress: None







    WAF Protection:






    WAF Rules:
    - Name: "AWSManagedRulesCommonRuleSet"
    Priority: 1
    Statement:
    ManagedRuleGroupStatement:
    VendorName: "AWS"
    Name: "AWSManagedRulesCommonRuleSet"

    - Name: "RateLimitRule"
    Priority: 2
    Statement:
    RateBasedStatement:
    Limit: 2000
    AggregateKeyType: "IP"







    When dealing with highly complex environments and high data volumes, the response time between identifying an anomaly and taking action is crucial. More than just a functional architecture, what is presented here is an approach focused on operational reliability, processing scalability, and cost control.


    Each component has been integrated with a purpose: to accurately detect deviations, trigger automated responses, and maintain active governance at all levels from security to resource allocation. This type of solution becomes truly effective when aligned with the organization's technical routine and business objectives. It is not just about technology, but about delivering predictability in scenarios where failures are costly.


    Thank you, see you next time.




    More...
Working...