ImplementingPerformance optimization with AWS Aurora PostgreSQL/MySQL for high-throughput workloads πŸ‘¨β€πŸ’»βš™

Collapse
X
 
  • Time
  • Show
Clear All
new posts
  • MyrinNew
    Senior Member
    • Feb 2024
    • 5168

    #1

    ImplementingPerformance optimization with AWS Aurora PostgreSQL/MySQL for high-throughput workloads πŸ‘¨β€πŸ’»βš™

    Amazon Aurora is AWS’s cloud-native relational database, built for high performance and high availability while staying compatible with MySQL and PostgreSQL. It’s faster and more efficient than standard deployments, but if you’re handling high-throughput workloads, a few extra optimizations can make a big difference.


    In this guide, we’ll walk through everything you need to know to fine-tune Aurora for demanding use cases. From identifying performance bottlenecks to optimizing queries, leveraging Aurora’s advanced features, and setting up proper monitoringβ€”this article has got you covered!


    πŸ” Understanding Aurora’s Architecture

    Before jumping into optimizations, let’s take a quick look at what makes Aurora different:
    • Distributed Storage – Aurora automatically replicates data across multiple Availability Zones (AZs) in an AWS Region. This separation of compute and storage allows for independent scaling and improved resilience.
    • Fault-Tolerant Storage System – Your data is stored in six copies across three AZs, ensuring durability and availability.
    • Efficient Write Operations – Aurora handles replication asynchronously, allowing high-throughput performance even for write-heavy applications.


    With this foundation in mind, let’s dive into the best ways to optimize your Aurora database for peak performance! πŸš€


    Key Factors Affecting Performance

    Instance Types and Sizing

    Aurora performance is significantly influenced by the instance class you choose. For high-throughput workloads, consider:
    • Memory-optimized instance types (r5, r6g, r6i) for workloads with large working sets
    • The latest generation instances which typically offer better performance
    • Instance size appropriate for your workload's CPU and memory requirements


    Network Bandwidth

    Network bandwidth can become a bottleneck for high-throughput workloads, especially with larger instance types. Ensure you're using instance types with sufficient network bandwidth for your workload requirements.


    Hands-On Lab: Setting Up an Optimized Aurora Environment

    Let's start with creating an Aurora environment optimized for high throughput.


    Step 1: Create an Aurora Cluster with Appropriate Configuration





    aws rds create-db-cluster \
    --db-cluster-identifier high-throughput-demo \
    --engine aurora-postgresql \
    --engine-version 15.3 \
    --master-username admin \
    --master-user-password YourStrongPassword123! \
    --db-subnet-group-name your-subnet-group \
    --vpc-security-group-ids sg-0123456789abcdef \
    --backup-retention-period 7 \
    --preferred-backup-window 07:00-09:00 \
    --preferred-maintenance-window sat:10:00-sat:12:00 \
    --db-cluster-parameter-group-name high-throughput-params \
    --storage-encrypted \
    --region us-east-1








    Step 2: Create Instance(s) with Appropriate Sizing





    aws rds create-db-instance \
    --db-instance-identifier high-throughput-writer \
    --db-cluster-identifier high-throughput-demo \
    --engine aurora-postgresql \
    --db-instance-class db.r6g.4xlarge \
    --region us-east-1








    For read scaling:






    aws rds create-db-instance \
    --db-instance-identifier high-throughput-reader-1 \
    --db-cluster-identifier high-throughput-demo \
    --engine aurora-postgresql \
    --db-instance-class db.r6g.2xlarge \
    --region us-east-1








    Step 3: Configure Parameter Groups

    Create a custom parameter group:






    aws rds create-db-cluster-parameter-group \
    --db-cluster-parameter-group-name high-throughput-params \
    --db-parameter-group-family aurora-postgresql15 \
    --description "Parameters optimized for high throughput" \
    --region us-east-1








    Modify parameters for PostgreSQL:






    aws rds modify-db-cluster-parameter-group \
    --db-cluster-parameter-group-name high-throughput-params \
    --parameters "ParameterName=max_connections,ParameterValue=5000 ,ApplyMethod=pending-reboot" \
    --region us-east-1

    aws rds modify-db-cluster-parameter-group \
    --db-cluster-parameter-group-name high-throughput-params \
    --parameters "ParameterName=shared_buffers,ParameterValue={DBIn stanceClassMemory/32768},ApplyMethod=pending-reboot" \
    --region us-east-1








    Schema Optimization Techniques

    Efficient Table Design

    Proper table design is fundamental to database performance. For high-throughput workloads:

    1. Choose appropriate data types (e.g., use INT instead of VARCHAR for numeric values)
    2. Normalize tables appropriately, but consider strategic denormalization for performance-critical queries
    3. Implement table partitioning for large tables


    Let's create an example of a partitioned table:






    -- For PostgreSQL
    CREATE TABLE orders (
    order_id BIGINT NOT NULL,
    customer_id INT NOT NULL,
    order_date DATE NOT NULL,
    total_amount DECIMAL(10,2),
    status VARCHAR(20)
    ) PARTITION BY RANGE (order_date);

    -- Create partitions by month
    CREATE TABLE orders_202501 PARTITION OF orders
    FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

    CREATE TABLE orders_202502 PARTITION OF orders
    FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');

    CREATE TABLE orders_202503 PARTITION OF orders
    FOR VALUES FROM ('2025-03-01') TO ('2025-04-01');








    Indexing Strategy

    Proper indexing is critical for high-throughput workloads:

    1. Create indexes on columns frequently used in WHERE, JOIN, and ORDER BY clauses
    2. Consider composite indexes for queries filtering on multiple columns
    3. Be cautious about over-indexing, as indexes consume space and slow down writes


    Example of creating efficient indexes:






    -- Primary key
    ALTER TABLE orders ADD PRIMARY KEY (order_id, order_date);

    -- Index for frequent queries filtering by customer and date range
    CREATE INDEX idx_orders_customer_date ON orders (customer_id, order_date);

    -- Index for status filtering
    CREATE INDEX idx_orders_status ON orders (status);








    For high write throughput, consider using INCLUDE in PostgreSQL indexes to reduce the need for index-only scans:






    CREATE INDEX idx_orders_date_include_status ON orders (order_date) INCLUDE (status);








    Query Optimization

    Analyzing and Improving Slow Queries

    Let's look at how to identify and optimize slow queries:

    1. Enable Performance Insights to track database load and identify top queries
    2. Use the PostgreSQL/MySQL EXPLAIN command to analyze query execution plans




    -- Enable query logging for slow queries (PostgreSQL)
    ALTER SYSTEM SET log_min_duration_statement = '1000'; -- Log queries taking more than 1 second
    ALTER SYSTEM SET log_statement = 'none';
    SELECT pg_reload_conf();

    -- Analyze a query using EXPLAIN ANALYZE
    EXPLAIN ANALYZE
    SELECT o.order_id, o.order_date, o.total_amount, c.customer_name
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id
    WHERE o.order_date BETWEEN '2025-01-01' AND '2025-01-31'
    AND o.status = 'COMPLETED'
    ORDER BY o.total_amount DESC
    LIMIT 100;








    Query Rewriting Techniques

    Some common query rewriting techniques for high-throughput environments:

    1. Use EXISTS instead of IN for checking existence:




    -- Instead of:
    SELECT * FROM orders WHERE customer_id IN (SELECT customer_id FROM premium_customers);

    -- Use:
    SELECT * FROM orders o WHERE EXISTS (
    SELECT 1 FROM premium_customers p WHERE p.customer_id = o.customer_id
    );







    1. Avoid functions on indexed columns:




    -- Instead of:
    SELECT * FROM orders WHERE EXTRACT(MONTH FROM order_date) = 3;

    -- Use:
    SELECT * FROM orders WHERE order_date BETWEEN '2025-03-01' AND '2025-03-31';







    1. Use UNION ALL instead of UNION when duplicates are acceptable:




    SELECT * FROM orders_202501 WHERE status = 'COMPLETED'
    UNION ALL
    SELECT * FROM orders_202502 WHERE status = 'COMPLETED';








    Connection Management

    Aurora connection management is crucial for high-throughput applications:

    1. Implement connection pooling with tools like PgBouncer, ProxySQL, or AWS RDS Proxy
    2. Monitor and tune max_connections parameter based on workload


    Let's set up RDS Proxy:






    aws rds create-db-proxy \
    --db-proxy-name aurora-high-throughput-proxy \
    --engine-family POSTGRESQL \
    --auth '{"AuthScheme":"SECRETS","SecretArn":"arn:aws:secr etsmanager:us-east-1:123456789012:secret:aurora-credentials","IAMAuth":"DISABLED"}' \
    --role-arn arn:aws:iam::123456789012:role/rds-proxy-role \
    --vpc-subnet-ids subnet-0123456789abcdef0 subnet-0123456789abcdef1 \
    --vpc-security-group-ids sg-0123456789abcdef \
    --require-tls \
    --idle-client-timeout 1800 \
    --debug-logging \
    --region us-east-1








    Register the target:






    aws rds register-db-proxy-targets \
    --db-proxy-name aurora-high-throughput-proxy \
    --db-cluster-identifiers high-throughput-demo \
    --region us-east-1








    Leveraging Aurora-Specific Features

    Read Scaling with Aurora Replicas

    For high-throughput read workloads, distribute reads across multiple Aurora Replicas:






    import psycopg2
    import random

    # List of reader endpoints
    reader_endpoints = [
    "high-throughput-reader-1.cluster-ro-abcdefghijkl.us-east-1.rds.amazonaws.com",
    "high-throughput-reader-2.cluster-ro-abcdefghijkl.us-east-1.rds.amazonaws.com"
    ]

    def get_read_connection():
    # Randomly select a reader endpoint
    endpoint = random.choice(reader_endpoints)

    conn = psycopg2.connect(
    host=endpoint,
    port=5432,
    database="mydatabase",
    user="admin",
    password="YourStrongPassword123!"
    )
    return conn

    def get_write_connection():
    # Write connection always goes to the primary instance
    conn = psycopg2.connect(
    host="high-throughput-demo.cluster-abcdefghijkl.us-east-1.rds.amazonaws.com",
    port=5432,
    database="mydatabase",
    user="admin",
    password="YourStrongPassword123!"
    )
    return conn








    Aurora Parallel Query

    Enable Parallel Query for analytical workloads:






    aws rds modify-db-cluster \
    --db-cluster-identifier high-throughput-demo \
    --enable-http-endpoint \
    --region us-east-1








    In PostgreSQL, configure a session to use parallel query:






    -- For PostgreSQL
    SET aurora.parallel_query = ON;

    -- Check if a query uses parallel query
    EXPLAIN
    SELECT customer_id, SUM(total_amount)
    FROM orders
    WHERE order_date BETWEEN '2025-01-01' AND '2025-03-31'
    GROUP BY customer_id;








    Monitoring and Continuous Optimization

    Setting Up Comprehensive Monitoring

    1. Enable Enhanced Monitoring and Performance Insights:




    aws rds modify-db-instance \
    --db-instance-identifier high-throughput-writer \
    --monitoring-interval 5 \
    --monitoring-role-arn arn:aws:iam::123456789012:role/rds-monitoring-role \
    --enable-performance-insights \
    --performance-insights-retention-period 7 \
    --region us-east-1







    1. Create CloudWatch alarms for critical metrics:




    aws cloudwatch put-metric-alarm \
    --alarm-name HighCPUUtilization-Aurora \
    --alarm-description "Alarm when CPU exceeds 80%" \
    --metric-name CPUUtilization \
    --namespace AWS/RDS \
    --statistic Average \
    --period 300 \
    --threshold 80 \
    --comparison-operator GreaterThanThreshold \
    --dimensions Name=DBInstanceIdentifier,Value=high-throughput-writer \
    --evaluation-periods 3 \
    --alarm-actions arn:aws:sns:us-east-1:123456789012:rds-alerts \
    --region us-east-1







    1. Create custom metrics dashboard:




    aws cloudwatch put-dashboard \
    --dashboard-name AuroraHighThroughputDashboard \
    --dashboard-body '{
    "widgets": [
    {
    "type": "metric",
    "x": 0,
    "y": 0,
    "width": 12,
    "height": 6,
    "properties": {
    "metrics": [
    [ "AWS/RDS", "CPUUtilization", "DBInstanceIdentifier", "high-throughput-writer" ]
    ],
    "period": 300,
    "stat": "Average",
    "region": "us-east-1",
    "title": "Writer CPU"
    }
    },
    {
    "type": "metric",
    "x": 12,
    "y": 0,
    "width": 12,
    "height": 6,
    "properties": {
    "metrics": [
    [ "AWS/RDS", "DMLLatency", "DBInstanceIdentifier", "high-throughput-writer" ]
    ],
    "period": 300,
    "stat": "Average",
    "region": "us-east-1",
    "title": "Write Latency"
    }
    }
    ]
    }' \
    --region us-east-1








    Benchmarking Your Aurora Deployment

    Use pgbench (PostgreSQL) or sysbench (MySQL) to establish performance baselines:






    # For PostgreSQL
    pgbench -h high-throughput-demo.cluster-abcdefghijkl.us-east-1.rds.amazonaws.com \
    -U admin \
    -p 5432 \
    -d mydatabase \
    -c 20 \
    -j 4 \
    -T 60 \
    -P 10








    Advanced Optimization Techniques

    Memory Optimization

    Fine-tune memory-related parameters:






    -- For PostgreSQL
    ALTER SYSTEM SET work_mem = '64MB';
    ALTER SYSTEM SET maintenance_work_mem = '1GB';
    ALTER SYSTEM SET effective_cache_size = '24GB'; -- Adjust based on instance memory
    SELECT pg_reload_conf();








    Vacuum and Analyze Operations

    Regular vacuum and analyze operations are critical for PostgreSQL performance:






    -- Set up autovacuum parameters
    ALTER SYSTEM SET autovacuum_vacuum_scale_factor = 0.05;
    ALTER SYSTEM SET autovacuum_analyze_scale_factor = 0.025;
    ALTER SYSTEM SET autovacuum_vacuum_cost_limit = 2000;
    SELECT pg_reload_conf();

    -- Manual VACUUM ANALYZE for immediate effect
    VACUUM ANALYZE orders;








    Advanced Optimization Techniques

    Using Aurora Global Database for Multi-Region Deployments

    For global applications requiring low-latency reads across regions:






    # Create a global database
    aws rds create-global-cluster \
    --global-cluster-identifier global-high-throughput \
    --source-db-cluster-identifier arn:aws:rds:us-east-1:123456789012:cluster:high-throughput-demo \
    --region us-east-1








    Add a secondary region:






    aws rds create-db-cluster \
    --db-cluster-identifier high-throughput-demo-eu \
    --engine aurora-postgresql \
    --engine-version 15.3 \
    --db-subnet-group-name your-eu-subnet-group \
    --vpc-security-group-ids sg-0123456789abcdef \
    --global-cluster-identifier global-high-throughput \
    --region eu-west-1








    Add instances to the secondary region:






    aws rds create-db-instance \
    --db-instance-identifier high-throughput-reader-eu \
    --db-cluster-identifier high-throughput-demo-eu \
    --engine aurora-postgresql \
    --db-instance-class db.r6g.2xlarge \
    --region eu-west-1








    Optimizing for Specific Workload Types

    Write-Intensive Workloads

    For applications with heavy write loads:

    1. Batch operations where possible:




    -- Instead of individual inserts
    INSERT INTO orders (order_id, customer_id, order_date, total_amount, status)
    VALUES
    (1001, 101, '2025-03-15', 299.99, 'PENDING'),
    (1002, 102, '2025-03-15', 199.50, 'PENDING'),
    (1003, 103, '2025-03-15', 599.75, 'PENDING'),
    (1004, 104, '2025-03-15', 149.99, 'PENDING'),
    (1005, 105, '2025-03-15', 849.99, 'PENDING');







    1. Reduce index overhead on write-heavy tables:




    -- Temporarily disable triggers during bulk loads
    ALTER TABLE orders DISABLE TRIGGER ALL;
    -- Perform bulk insert
    -- Then re-enable triggers
    ALTER TABLE orders ENABLE TRIGGER ALL;







    1. Consider UNLOGGED tables (PostgreSQL) for temporary data:




    CREATE UNLOGGED TABLE temp_orders (
    order_id BIGINT NOT NULL,
    customer_id INT NOT NULL,
    order_date DATE NOT NULL,
    total_amount DECIMAL(10,2),
    status VARCHAR(20)
    );








    Read-Intensive Workloads

    For read-heavy applications:

    1. Implement materialized views for complex queries:




    -- For PostgreSQL
    CREATE MATERIALIZED VIEW monthly_sales AS
    SELECT
    DATE_TRUNC('month', order_date) AS month,
    customer_id,
    SUM(total_amount) AS monthly_total,
    COUNT(*) AS order_count
    FROM orders
    GROUP BY DATE_TRUNC('month', order_date), customer_id;

    -- Create index on the materialized view
    CREATE INDEX idx_monthly_sales_customer ON monthly_sales (customer_id, month);

    -- Refresh as needed
    REFRESH MATERIALIZED VIEW monthly_sales;







    1. Use Query Caching at the application level:




    import redis
    import json
    import hashlib
    import psycopg2

    # Initialize Redis client
    redis_client = redis.Redis(host='your-redis-host.cache.amazonaws.com', port=6379)

    def get_cached_query_results(query, params=None, ttl=300):
    # Create a cache key based on the query and parameters
    cache_key = f"db:query:{hashlib.md5((query + str(params)).encode()).hexdigest()}"

    # Check if results exist in cache
    cached_result = redis_client.get(cache_key)
    if cached_result:
    return json.loads(cached_result)

    # If not in cache, execute query
    conn = get_read_connection()
    cursor = conn.cursor()
    cursor.execute(query, params)
    columns = [desc[0] for desc in cursor.description]
    results = [dict(zip(columns, row)) for row in cursor.fetchall()]
    cursor.close()
    conn.close()

    # Store in cache with TTL
    redis_client.setex(cache_key, ttl, json.dumps(results))
    return results








    Lab: Real-Time Performance Testing and Tuning

    Let's create a comprehensive testing setup to optimize our Aurora deployment under load.


    Step 1: Create Test Data

    We'll use pgbench to initialize a test database and then extend it with our custom schema:






    # Connect to your Aurora instance
    psql "host=high-throughput-demo.cluster-abcdefghijkl.us-east-1.rds.amazonaws.com port=5432 dbname=postgres user=admin password=YourStrongPassword123!"

    # Create test database
    CREATE DATABASE perf_test;
    \c perf_test

    # Initialize with pgbench schema (100 scaling factor ~ 10 million rows)
    \! pgbench -i -s 100 -h high-throughput-demo.cluster-abcdefghijkl.us-east-1.rds.amazonaws.com -p 5432 -U admin perf_test








    Step 2: Create Custom Test Schema





    -- Create orders table with partitioning
    CREATE TABLE orders (
    order_id SERIAL PRIMARY KEY,
    customer_id INT NOT NULL,
    order_date TIMESTAMP NOT NULL,
    total_amount DECIMAL(10,2) NOT NULL,
    status VARCHAR(20) NOT NULL,
    items JSONB,
    shipping_address TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    ) PARTITION BY RANGE (order_date);

    -- Create partitions
    CREATE TABLE orders_q1_2025 PARTITION OF orders
    FOR VALUES FROM ('2025-01-01') TO ('2025-04-01');

    CREATE TABLE orders_q2_2025 PARTITION OF orders
    FOR VALUES FROM ('2025-04-01') TO ('2025-07-01');

    -- Create appropriate indexes
    CREATE INDEX idx_orders_customer_date ON orders (customer_id, order_date);
    CREATE INDEX idx_orders_status ON orders (status);
    CREATE INDEX idx_orders_items ON orders USING GIN (items);

    -- Create customers table
    CREATE TABLE customers (
    customer_id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    tier VARCHAR(20) NOT NULL,
    signup_date DATE NOT NULL,
    last_order_date TIMESTAMP,
    total_orders INT DEFAULT 0,
    total_spent DECIMAL(12,2) DEFAULT 0
    );

    CREATE INDEX idx_customers_tier ON customers (tier);
    CREATE INDEX idx_customers_signup ON customers (signup_date);








    Step 3: Generate Test Data

    Create a Python script to generate realistic test data:






    import psycopg2
    import random
    import json
    from datetime import datetime, timedelta
    import time

    # Connect to database
    conn = psycopg2.connect(
    host="high-throughput-demo.cluster-abcdefghijkl.us-east-1.rds.amazonaws.com",
    port=5432,
    database="perf_test",
    user="admin",
    password="YourStrongPassword123!"
    )
    conn.autocommit = False
    cursor = conn.cursor()

    # Generate customers
    customer_tiers = ["BRONZE", "SILVER", "GOLD", "PLATINUM"]
    product_names = ["Laptop", "Smartphone", "Headphones", "Monitor", "Keyboard", "Mouse", "Tablet", "Camera", "Printer", "Speaker"]
    statuses = ["PENDING", "PROCESSING", "SHIPPED", "DELIVERED", "RETURNED", "CANCELLED"]

    # Insert customers
    print("Generating customers...")
    for i in range(1, 100001):
    name = f"Customer {i}"
    email = f"customer{i}@example.com"
    tier = random.choice(customer_tiers)
    signup_date = datetime(2024, 1, 1) + timedelta(days=random.randint(0, 365))

    cursor.execute(
    "INSERT INTO customers (name, email, tier, signup_date) VALUES (%s, %s, %s, %s)",
    (name, email, tier, signup_date)
    )

    if i % 1000 == 0:
    conn.commit()
    print(f"Inserted {i} customers")

    conn.commit()
    print("Finished generating customers")

    # Insert orders
    print("Generating orders...")
    start_date = datetime(2025, 1, 1)
    end_date = datetime(2025, 6, 30)
    days_range = (end_date - start_date).days

    for i in range(1, 500001):
    customer_id = random.randint(1, 100000)
    order_date = start_date + timedelta(days=random.randint(0, days_range))

    # Generate between 1 and 5 items for each order
    num_items = random.randint(1, 5)
    items = []
    total_amount = 0

    for _ in range(num_items):
    product = random.choice(product_names)
    price = round(random.uniform(9.99, 999.99), 2)
    quantity = random.randint(1, 3)
    item_total = price * quantity
    total_amount += item_total

    items.append({
    "product": product,
    "price": price,
    "quantity": quantity,
    "item_total": item_total
    })

    status = random.choice(statuses)
    shipping_address = f"{random.randint(100, 999)} Main St, City {random.randint(1, 100)}, State {random.randint(1, 50)}"

    # Insert order
    cursor.execute(
    "INSERT INTO orders (customer_id, order_date, total_amount, status, items, shipping_address) VALUES (%s, %s, %s, %s, %s, %s)",
    (customer_id, order_date, total_amount, status, json.dumps(items), shipping_address)
    )

    # Update customer stats (in real application, consider using triggers for this)
    cursor.execute(
    "UPDATE customers SET total_orders = total_orders + 1, total_spent = total_spent + %s, last_order_date = %s WHERE customer_id = %s",
    (total_amount, order_date, customer_id)
    )

    if i % 1000 == 0:
    conn.commit()
    print(f"Inserted {i} orders")

    conn.commit()
    print("Finished generating orders")

    # Update statistics
    print("Analyzing tables...")
    cursor.execute("VACUUM ANALYZE customers")
    cursor.execute("VACUUM ANALYZE orders")
    print("Done!")

    cursor.close()
    conn.close()








    Step 4: Create Test Workload Scripts

    Create scripts to simulate real-world read and write patterns:


    A. Read_workload.py






    # read_workload.py - Simulates high-throughput read queries
    import psycopg2
    import random
    import time
    import concurrent.futures
    from datetime import datetime, timedelta

    # Read connection pool
    read_endpoints = [
    "high-throughput-reader-1.cluster-ro-abcdefghijkl.us-east-1.rds.amazonaws.com",
    "high-throughput-reader-2.cluster-ro-abcdefghijkl.us-east-1.rds.amazonaws.com"
    ]

    # Common queries representing typical workload
    queries = [
    # Query 1: Get recent orders for a customer
    """
    SELECT o.order_id, o.order_date, o.total_amount, o.status
    FROM orders o
    WHERE o.customer_id = %s
    ORDER BY o.order_date DESC
    LIMIT 10
    """,

    # Query 2: Get orders in a date range with filtering
    """
    SELECT o.order_id, o.customer_id, o.order_date, o.total_amount, o.status
    FROM orders o
    WHERE o.order_date BETWEEN %s AND %s
    AND o.status = %s
    ORDER BY o.total_amount DESC
    LIMIT 100
    """,

    # Query 3: Get summary statistics
    """
    SELECT
    COUNT(*) as order_count,
    SUM(total_amount) as total_revenue,
    AVG(total_amount) as avg_order_value,
    MIN(total_amount) as min_order,
    MAX(total_amount) as max_order
    FROM orders
    WHERE order_date BETWEEN %s AND %s
    """,

    # Query 4: Complex aggregation with join
    """
    SELECT
    c.tier,
    DATE_TRUNC('month', o.order_date) as month,
    COUNT(DISTINCT c.customer_id) as unique_customers,
    COUNT(o.order_id) as order_count,
    SUM(o.total_amount) as total_revenue
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id
    WHERE o.order_date BETWEEN %s AND %s
    GROUP BY c.tier, DATE_TRUNC('month', o.order_date)
    ORDER BY month, tier
    """,

    # Query 5: JSONB filtering
    """
    SELECT o.order_id, o.total_amount
    FROM orders o
    WHERE o.items @> '[{"product": %s}]'
    LIMIT 50
    """
    ]

    def get_connection():
    endpoint = random.choice(read_endpoints)
    return psycopg2.connect(
    host=endpoint,
    port=5432,
    database="perf_test",
    user="admin",
    password="YourStrongPassword123!"
    )

    def execute_query(query_idx):
    conn = get_connection()
    cursor = conn.cursor()

    try:
    start_time = time.time()

    # Generate parameters based on query type
    if query_idx == 0:
    # Random customer ID
    params = (random.randint(1, 100000),)
    elif query_idx == 1:
    # Date range and status
    start_date = datetime(2025, 1, 1) + timedelta(days=random.randint(0, 150))
    end_date = start_date + timedelta(days=30)
    status = random.choice(["PENDING", "PROCESSING", "SHIPPED", "DELIVERED"])
    params = (start_date, end_date, status)
    elif query_idx == 2 or query_idx == 3:
    # Date range
    start_date = datetime(2025, 1, 1) + timedelta(days=random.randint(0, 150))
    end_date = start_date + timedelta(days=30)
    params = (start_date, end_date)
    elif query_idx == 4:
    # Product for JSONB query
    params = (random.choice(["Laptop", "Smartphone", "Headphones", "Monitor", "Keyboard"]),)

    cursor.execute(queries[query_idx], params)
    cursor.fetchall() # Actually fetch the results

    end_time = time.time()
    return end_time - start_time
    except Exception as e:
    print(f"Query error: {e}")
    return None
    finally:
    cursor.close()
    conn.close()

    def run_workload(num_queries=1000, max_workers=20):
    print(f"Starting read workload simulation with {num_queries} queries...")
    query_times = []

    with concurrent.futures.ThreadPoolExecutor(max_workers= max_workers) as executor:
    futures = []

    for _ in range(num_queries):
    # Select a random query type
    query_idx = random.randint(0, len(queries) - 1)
    futures.append(executor.submit(execute_query, query_idx))

    for future in concurrent.futures.as_completed(futures):
    exec_time = future.result()
    if exec_time is not None:
    query_times.append(exec_time)

    # Calculate statistics
    if query_times:
    avg_time = sum(query_times) / len(query_times)
    min_time = min(query_times)
    max_time = max(query_times)
    p95_time = sorted(query_times)[int(len(query_times) * 0.95)]

    print(f"Completed {len(query_times)} queries")
    print(f"Average query time: {avg_time:.4f} seconds")
    print(f"Min query time: {min_time:.4f} seconds")
    print(f"Max query time: {max_time:.4f} seconds")
    print(f"95th percentile query time: {p95_time:.4f} seconds")
    else:
    print("No successful queries")
    if name == "main":
    run_workload(num_queries=5000, max_workers=50)








    B. Write_workload.py


    Create a write workload script:






    # write_workload.py - Simulates high-throughput write operations
    import psycopg2
    import random
    import time
    import concurrent.futures
    from datetime import datetime, timedelta
    import json

    # Write endpoint (primary instance)
    write_endpoint = "high-throughput-demo.cluster-abcdefghijkl.us-east-1.rds.amazonaws.com"

    # Product names for generating random orders
    product_names = ["Laptop", "Smartphone", "Headphones", "Monitor", "Keyboard", "Mouse", "Tablet", "Camera", "Printer", "Speaker"]
    statuses = ["PENDING", "PROCESSING"]

    def get_connection():
    return psycopg2.connect(
    host=write_endpoint,
    port=5432,
    database="perf_test",
    user="admin",
    password="YourStrongPassword123!"
    )

    def execute_insert():
    conn = get_connection()
    cursor = conn.cursor()

    try:
    start_time = time.time()

    # Generate a random order
    customer_id = random.randint(1, 100000)
    order_date = datetime.now()

    # Generate between 1 and 5 items
    num_items = random.randint(1, 5)
    items = []
    total_amount = 0

    for _ in range(num_items):
    product = random.choice(product_names)
    price = round(random.uniform(9.99, 999.99), 2)
    quantity = random.randint(1, 3)
    item_total = price * quantity
    total_amount += item_total

    items.append({
    "product": product,
    "price": price,
    "quantity": quantity,
    "item_total": item_total
    })

    status = random.choice(statuses)
    shipping_address = f"{random.randint(100, 999)} Main St, City {random.randint(1, 100)}, State {random.randint(1, 50)}"

    # Insert order
    cursor.execute(
    "INSERT INTO orders (customer_id, order_date, total_amount, status, items, shipping_address) VALUES (%s, %s, %s, %s, %s, %s) RETURNING order_id",
    (customer_id, order_date, total_amount, status, json.dumps(items), shipping_address)
    )
    order_id = cursor.fetchone()[0]

    # Update customer stats
    cursor.execute(
    "UPDATE customers SET total_orders = total_orders + 1, total_spent = total_spent + %s, last_order_date = %s WHERE customer_id = %s",
    (total_amount, order_date, customer_id)
    )

    conn.commit()
    end_time = time.time()

    return {"order_id": order_id, "exec_time": end_time - start_time}
    except Exception as e:
    conn.rollback()
    print(f"Insert error: {e}")
    return None
    finally:
    cursor.close()
    conn.close()

    def execute_update():
    conn = get_connection()
    cursor = conn.cursor()

    try:
    start_time = time.time()

    # Get a random order ID
    cursor.execute("SELECT order_id FROM orders ORDER BY RANDOM() LIMIT 1")
    result = cursor.fetchone()
    if not result:
    return None
    order_id = result[0]

    # Update the order status
    new_status = random.choice(["SHIPPED", "DELIVERED"])
    cursor.execute(
    "UPDATE orders SET status = %s WHERE order_id = %s",
    (new_status, order_id)
    )

    conn.commit()
    end_time = time.time()

    return {"order_id": order_id, "exec_time": end_time - start_time}
    except Exception as e:
    conn.rollback()
    print(f"Update error: {e}")
    return None
    finally:
    cursor.close()
    conn.close()

    def run_workload(num_operations=1000, insert_ratio=0.7, max_workers=20):
    print(f"Starting write workload simulation with {num_operations} operations...")
    operation_times = {"insert": [], "update": []}

    with concurrent.futures.ThreadPoolExecutor(max_workers= max_workers) as executor:
    futures = []

    for _ in range(num_operations):
    # Determine operation type based on ratio
    if random.random() insert_ratio:
    futures.append((executor.submit(execute_insert), "insert"))
    else:
    futures.append((executor.submit(execute_update), "update"))

    for future, op_type in futures:
    result = future.result()
    if result is not None:
    operation_times[op_type].append(result["exec_time"])

    # Calculate statistics for each operation type
    for op_type, times in operation_times.items():
    if times:
    avg_time = sum(times) / len(times)
    min_time = min(times)
    max_time = max(times)
    p95_time = sorted(times)[int(len(times) * 0.95)]

    print(f"\\nCompleted {len(times)} {op_type} operations")
    print(f"Average {op_type} time: {avg_time:.4f} seconds")
    print(f"Min {op_type} time: {min_time:.4f} seconds")
    print(f"Max {op_type} time: {max_time:.4f} seconds")
    print(f"95th percentile {op_type} time: {p95_time:.4f} seconds")
    else:
    print(f"No successful {op_type} operations")

    if __name__ == "__main__":
    run_workload(num_operations=2000, insert_ratio=0.7, max_workers=30)







    Step 5: Performance Testing and Tuning Process

    Now let's establish a systematic approach to performance testing and tuning:

    1. Establish baseline performance:




    # Run read workload test
    python read_workload.py

    # Run write workload test
    python write_workload.py







    1. Identify bottlenecks using Performance Insights and CloudWatch Metrics:


    Monitor key metrics during the test:
    • CPU Utilization
    • Database connections
    • Buffer cache hit ratio
    • Read/write IOPS
    • Read/write latency
    • Network throughput
    • Wait events in Performance Insights
    • Tune parameters based on identified bottlenecks:


    For PostgreSQL workloads with high connection rates:






    -- Increase max connections
    ALTER SYSTEM SET max_connections = '1000';

    -- Tune shared_buffers (typically 25% of instance memory)
    ALTER SYSTEM SET shared_buffers = '8GB';

    -- Tune work_mem for complex queries
    ALTER SYSTEM SET work_mem = '64MB';

    -- Tune maintenance_work_mem for vacuum operations
    ALTER SYSTEM SET maintenance_work_mem = '1GB';

    -- Tune effective_cache_size (typically 75% of instance memory)
    ALTER SYSTEM SET effective_cache_size = '24GB';

    -- Adjust autovacuum parameters for write-heavy workloads
    ALTER SYSTEM SET autovacuum_vacuum_scale_factor = 0.05;
    ALTER SYSTEM SET autovacuum_analyze_scale_factor = 0.025;
    ALTER SYSTEM SET autovacuum_vacuum_cost_limit = 2000;

    -- Apply changes
    SELECT pg_reload_conf();







    1. After each parameter change, re-run tests and measure impact:




    # Run read and write workload tests again
    python read_workload.py
    python write_workload.py







    1. Document performance improvements:


    Create a documentation file showing:
    • Baseline performance metrics
    • Identified bottlenecks
    • Parameter changes made
    • Performance improvements observed
    • Recommended final configuration


    Best Practices for Ongoing Optimization

    Implement a Regular Performance Review Process

    1. Schedule weekly reviews of Performance Insights and CloudWatch metrics.
    2. Set up automated alerting for performance degradation.
    3. Document performance changes when application behavior changes.


    Create a Database Parameter Change Management Process

    1. Test all parameter changes in a staging environment first.
    2. Document each parameter change with justification and expected impact.
    3. Implement changes during maintenance windows when possible.
    4. Maintain parameter groups in version control or AWS CloudFormation.


    SQL Query Review Process

    Establish a code review process for database queries:

    1. Require EXPLAIN ANALYZE for all new queries.
    2. Set performance targets for critical queries.
    3. Maintain a query library of common patterns and anti-patterns.


    Conclusion πŸš€πŸ’‘

    Optimizing Aurora PostgreSQL/MySQL for high-throughput workloads requires a systematic approach covering instance sizing, schema design, query optimization, and Aurora-specific features. By following the hands-on lab and implementing the best practices outlined in this article, you can achieve significant performance improvements for your most demanding workloads.


    Remember that performance optimization is an ongoing process rather than a one-time task. Regular monitoring, testing, and tuning are essential to maintain optimal performance as your workloads evolve. ✨✨


    Additional Resources





    More...
Working...