How Does SeaTunnel Perform "Precise Sharding" for MySQL Tables?

Collapse
X
 
  • Time
  • Show
Clear All
new posts
  • MyrinNew
    Senior Member
    • Feb 2024
    • 5168

    #1

    How Does SeaTunnel Perform "Precise Sharding" for MySQL Tables?

    Overview

    To achieve parallel reading, the Apache SeaTunnel MySQL CDC connector needs to split large tables into multiple splits. For non-primary key tables, the connector provides a variety of intelligent sharding strategies to ensure data integrity and reading efficiency. This article will detail the core sharding strategies supported by Apache SeaTunnel, the mechanism and implementation of sharding strategies, and compare the advantages and disadvantages of each sharding strategy.


    1. Sharding Column Selection Strategy

    1.1 Selection Priority





    1. User-configured snapshotSplitColumn (preferably a unique key)
    2. Primary key column (selected based on data type priority)
    3. Unique key column (selected based on data type priority)
    4. No available columns → Single split strategy







    1.2 Supported Data Types

    Data types supported by the MySQL CDC connector:

    According to the implementation of the AbstractJdbcSourceChunkSplitter.isEvenlySplitColum n() method:






    // AbstractJdbcSourceChunkSplitter.isEvenlySplitColum n()
    switch (fromDbzColumn(splitColumn).getSqlType()) {
    case TINYINT:
    case SMALLINT:
    case INT:
    case BIGINT:
    case DECIMAL:
    case STRING:
    return true;
    default:
    return false;
    }







    ✅ Supported types:
    • Numeric types: TINYINT, SMALLINT, INT, BIGINT, DECIMAL
    • String type: STRING (using hash sharding)


    ❌ Note: MySQL CDC does not support sharding by datetime types

    Unsupported types:
    • DATE: Not supported as a sharding column
    • DATETIME: Not supported as a sharding column
    • TIMESTAMP: Not supported as a sharding column
    • TIME: Not supported as a sharding column


    Comparison: Support status of the ordinary JDBC connector

    It is worth noting that the ordinary JDBC connector (DynamicChunkSplitter) supports sharding by DATE type:






    // Types supported by DynamicChunkSplitter
    switch (splitColumnType.getSqlType()) {
    case TINYINT:
    case SMALLINT:
    case INT:
    case BIGINT:
    case DECIMAL:
    case DOUBLE:
    case FLOAT:
    return evenlyColumnSplitChunks(table, splitColumnName, min, max, chunkSize);
    case STRING:
    // String sharding logic
    case DATE: // ✅ Ordinary JDBC supports DATE type
    return dateColumnSplitChunks(table, splitColumnName, min, max, chunkSize);
    }







    Practical impacts and solutions

    If a table only has index columns of datetime types, MySQL CDC will:

    1. Fail to find a suitable sharding column
    2. Fall back to the single split mode
    3. Lose the advantage of parallel reading


    1.3 Data Type Priority





    // Priority: 1 is the highest, 6 is the lowest
    TINYINT(1) > SMALLINT(2) > INT(3) > BIGINT(4) > DECIMAL(5) > STRING(6)







    2. Sharding Strategy Decision Mechanism

    SeaTunnel determines which sharding strategy to use through a sophisticated decision-making algorithm, and this decision-making process is based on factors such as data distribution characteristics and table size.


    2.1 Overview of the Decision-Making Process





    // The core decision-making logic is located in AbstractJdbcSourceChunkSplitter.generateSplits()
    public CollectionSnapshotSplit> generateSplits(JdbcConnection jdbc, TableId tableId) {
    // 1. Obtain configuration parameters
    final int chunkSize = sourceConfig.getSplitSize(); // Default: 8096
    final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); // Default: 100.0
    final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); // Default: 0.05
    final int sampleShardingThreshold = sourceConfig.getSampleShardingThreshold(); // Default: 1000

    // 2. Check the sharding column type
    if (isEvenlySplitColumn(splitColumn)) {
    // 3. Query the approximate number of rows and calculate the distribution factor
    long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
    double distributionFactor = calculateDistributionFactor(tableId, min, max, approximateRowCnt);

    // 4. Determine whether the data distribution is uniform
    boolean dataIsEvenlyDistributed =
    distributionFactor >= distributionFactorLower &&
    distributionFactor distributionFactorUpper;

    if (dataIsEvenlyDistributed) {
    // Uniform sharding strategy
    return splitEvenlySizedChunks(...);
    } else {
    // 5. Check if the sampling strategy is needed
    int shardCount = (int) (approximateRowCnt / chunkSize);
    if (sampleShardingThreshold shardCount) {
    // Sampling-based sharding strategy
    return efficientShardingThroughSampling(...);
    } else {
    // Non-uniform sharding strategy
    return splitUnevenlySizedChunks(...);
    }
    }
    } else {
    // String type: Non-uniform sharding strategy
    return splitUnevenlySizedChunks(...);
    }
    }







    2.2 Calculation of the Distribution Factor

    Core formula:






    distributionFactor = (MAX - MIN + 1) / approximateRowCount







    Calculation logic:






    protected double calculateDistributionFactor(TableId tableId, Object min, Object max, long approximateRowCnt) {
    if (approximateRowCnt == 0) {
    return Double.MAX_VALUE; // Handling for empty tables
    }

    BigDecimal difference = ObjectUtils.minus(max, min);
    final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
    double distributionFactor = subRowCnt.divide(
    new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue();

    return distributionFactor;
    }







    Meaning of the distribution factor:
    • factor ≈ 1.0: The data distribution is ideal, with continuous IDs and no gaps
    • factor > 100: Sparse data, where the ID range is much larger than the number of rows (e.g., IDs 1-1,000,000 but only 1,000 rows)
    • factor : Dense data, where multiple rows share similar ID values (e.g., a timestamp column with multiple records in the same second)


    2.3 Detailed Explanation of Decision-Making Conditions

    Condition 1: Sharding Column Type Check





    // Data types that support uniform sharding
    private boolean isEvenlySplitColumn(Column splitColumn) {
    return splitColumn.isNumeric() || splitColumn.isTemporalType();
    }







    Condition 2: Judgment of Data Distribution Uniformity





    boolean dataIsEvenlyDistributed =
    doubleCompare(distributionFactor, distributionFactorLower) >= 0 &&
    doubleCompare(distributionFactor, distributionFactorUpper) 0;
    // That is: 0.05 ≤ distributionFactor ≤ 100







    Condition 3: Trigger Condition for the Sampling Strategy





    int shardCount = (int) (approximateRowCnt / chunkSize);
    if (sampleShardingThreshold shardCount) {
    // Enable the sampling strategy when the estimated number of splits exceeds 1000
    }







    2.4 Practical Decision-Making Examples

    Example 1: Ideal Uniform Distribution





    Table: user_orders
    Sharding column: order_id (BIGINT)
    Data range: 1 - 100,000
    Number of rows: 100,000
    chunkSize: 10,000

    Calculation:
    distributionFactor = (100000 - 1 + 1) / 100000 = 1.0
    Judgment: 0.05 ≤ 1.0 ≤ 100 → Uniform data distribution
    Result: Use the uniform sharding strategy to generate 10 splits







    Example 2: Sparse Data, Triggering Sampling





    Table: big_transactions
    Sharding column: transaction_id (BIGINT)
    Data range: 1 - 10,000,000
    Number of rows: 50,000
    chunkSize: 1,000

    Calculation:
    distributionFactor = (10000000 - 1 + 1) / 50000 = 200
    Estimated number of splits = 50000 / 1000 = 50
    Judgment: 200 > 100 → Non-uniform data distribution
    50 Result: Use the non-uniform sharding strategy







    Example 3: Large Table Triggering the Sampling Strategy





    Table: log_events
    Sharding column: event_id (BIGINT)
    Data range: 1 - 100,000
    Number of rows: 5,000,000
    chunkSize: 1,000

    Calculation:
    distributionFactor = (100000 - 1 + 1) / 5000000 = 0.2
    Estimated number of splits = 5000000 / 1000 = 5000
    Judgment: 0.02 5000 > 1000 → Trigger the sampling strategy
    Result: Use the sampling-based sharding strategy, generate 5000 splits after sampling







    Example 4: Dense Distribution of Timestamp Columns (assuming timestamp type is supported)





    Table: sensor_data
    Sharding column: timestamp (TIMESTAMP)
    Data range: 2023-01-01 00:00:00 - 2023-01-01 01:00:00 (3600 seconds)
    Number of rows: 1,000,000
    chunkSize: 10,000
    Calculation:
    distributionFactor = 3600 / 1000000 = 0.0036
    Judgment: 0.0036 Estimated number of splits = 1000000 / 10000 = 100 Result: Use the non-uniform sharding strategy







    2.5 Summary of Strategy Selection

    Numeric column + Uniform distribution [0.05, 100] Any Uniform sharding Auto-incrementing IDs, uniformly distributed numeric values
    Numeric column + Non-uniform + Small table 100 ≤1000 Non-uniform sharding Sparse IDs, dense timestamps
    Numeric column + Non-uniform + Large table 100 >1000 Sampling-based sharding Large tables with extremely non-uniform distribution
    String column Not applicable Any Non-uniform sharding String-type sharding columns


    3. Three Core Sharding Strategies

    3.1 Uniform Sharding (Evenly Sized Chunks)

    Applicable scenarios: Numeric columns with uniform data distribution


    Judgment conditions:






    // Calculation of the distribution factor
    distributionFactor = (max - min + 1) / approximateRowCount

    // Judgment of uniform distribution
    distributionFactorLower distributionFactor distributionFactorUpper
    // Default: 0.05







    Sharding logic:






    // Calculation of dynamic chunk size
    dynamicChunkSize = Math.max((int)(distributionFactor * chunkSize), 1)

    // Calculation of sharding range
    chunkStart = null
    chunkEnd = min + dynamicChunkSize
    while (chunkEnd max) {
    splits.add(ChunkRange.of(chunkStart, chunkEnd))
    chunkStart = chunkEnd
    chunkEnd = chunkEnd + dynamicChunkSize
    }
    // Add the last split
    splits.add(ChunkRange.of(chunkStart, null))







    Example:






    Table: user_table, Primary key: id, Range: 1-10000, Number of rows: 10000
    distributionFactor = (10000-1+1)/10000 = 1.0
    chunkSize = 1000, dynamicChunkSize = 1000

    Sharding result:
    Split1: [null, 1000] // id Split2: [1000, 2000] // 1000 Split3: [2000, 3000] // 2000 ...
    Split10: [9000, null] // id > 9000







    3.2 Non-Uniform Sharding (Unevenly Sized Chunks)

    Applicable scenarios: Non-uniform data distribution or non-numeric columns


    Sharding logic:






    // Continuously query the maximum value of the next chunk
    Object chunkStart = null
    Object chunkEnd = queryNextChunkMax(jdbc, min, tableId, splitColumn, max, chunkSize)

    while (chunkEnd != null && chunkEnd max) {
    splits.add(ChunkRange.of(chunkStart, chunkEnd))
    chunkStart = chunkEnd
    chunkEnd = queryNextChunkMax(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize)
    }
    splits.add(ChunkRange.of(chunkStart, null))







    SQL example:






    -- Query the maximum value of the next chunk
    SELECT MAX(split_column) FROM (
    SELECT split_column FROM table_name
    WHERE split_column >= ?
    ORDER BY split_column
    LIMIT ?
    ) t







    Example:






    Table: order_table, Sharding column: create_time, chunkSize=1000

    Query process:
    1. Query the maximum create_time of the first 1000 rows → '2023-01-15 10:30:00'
    2. Query the maximum create_time of the next 1000 rows → '2023-02-20 15:45:00'
    3. Continue querying...

    Sharding result:
    Split1: [null, '2023-01-15 10:30:00']
    Split2: ['2023-01-15 10:30:00', '2023-02-20 15:45:00']
    Split3: ['2023-02-20 15:45:00', '2023-03-25 09:20:00']
    ...







    3.3 Sampling-Based Sharding

    Applicable scenarios: Large tables with extremely non-uniform data distribution


    Trigger conditions:






    // Enable when the estimated number of splits exceeds the threshold
    int shardCount = (int)(approximateRowCount / chunkSize)
    if (sampleShardingThreshold shardCount) {
    // Use sampling-based sharding
    }
    // Default threshold: 1000







    Sampling logic:






    // Sample data
    Object[] sampleData = sampleDataFromColumn(jdbc, tableId, splitColumn, inverseSamplingRate)

    // Calculate the number of samples per split
    double approxSamplePerShard = (double)sampleData.length / shardCount

    // Determine the split boundaries based on the sample data
    for (int i = 0; i shardCount; i++) {
    Object chunkStart = lastEnd
    Object chunkEnd = (i shardCount - 1)
    ? sampleData[(int)((i + 1) * approxSamplePerShard)]
    : null
    splits.add(ChunkRange.of(chunkStart, chunkEnd))
    }







    Example:






    Table: big_table, Number of rows: 10 million, chunkSize=10000, Estimated number of splits=1000
    inverseSamplingRate=1000 (sampling rate 1/1000)

    Sampling process:
    1. Sample 10000 rows of data from the table
    2. Sort the sample data by the sharding column
    3. According to the requirement of 1000 splits, determine a split boundary every 10 samples

    Sharding result: Boundaries determined based on the distribution of the sample data







    4. Detailed Explanation of SQL Query Cases and Code Implementation

    4.1 Mapping of Core SQL Query Methods

    MIN/MAX query queryMinMax() MySqlUtils.java Obtain
    the minimum and maximum values of the sharding column
    Row count query queryApproximateRowCnt() MySqlUtils.java Obtain the approximate number of rows in the table
    Dynamic boundary query queryNextChunkMax() MySqlUtils.java Boundary calculation for non-uniform sharding
    Sample data query sampleDataFromColumn() MySqlUtils.java Data collection for the sampling strategy
    String hash query hashModForField() MysqlDialect.java Hash sharding for string types


    5. Summary of SQL Query Patterns

    5.1 Comparison of SQL Query Patterns for Each Strategy

    Uniform Sharding Numeric WHERE col >= start AND col WHERE order_id >= 1 AND order_id
    Uniform Sharding String Hash modulo query WHERE ABS(CRC32(name) % 4) = 0
    Non-uniform Sharding Numeric Dynamic boundary query SELECT MAX(id) FROM (SELECT id FROM table WHERE id >= ? ORDER BY id LIMIT ?)
    Non-uniform Sharding String Hash modulo query WHERE ABS(MD5(name) % 4) = 0
    Sampling-based Sharding Numeric Sampling + boundary query WHERE MOD((id - (SELECT MIN(id) FROM table)), 1000) = 0
    Sampling-based Sharding String String sampling query WHERE ABS(CRC32(name) % 1000) = 0


    5.2 Comparison of SQL Differences Across Databases

    MySQL MD5(field) SHOW TABLE STATUS LIMIT n
    PostgreSQL HASHTEXT(field) pg_class.reltuples LIMIT n
    SQL Server HASHBYTES('MD5', field) sys.dm_db_partition_stats TOP n
    Oracle ORA_HASH(field) all_tables.num_rows ROWNUM


    6. Performance Optimization and Configuration

    6.1 Distribution Factor Tuning





    # Distribution factor configuration
    chunk-key.even-distribution.factor.upper-bound = 100.0 # Upper limit, default 100.0
    chunk-key.even-distribution.factor.lower-bound = 0.05 # Lower limit, default 0.05







    Parameter Description:
    • chunk-key.even-distribution.factor.upper-bound: Upper bound of the uniform distribution factor, used to determine if data is uniformly distributed
    • chunk-key.even-distribution.factor.lower-bound: Lower bound of the uniform distribution factor, calculated as: (MAX(id) - MIN(id) + 1) / row count


    6.2 Sampling Strategy Tuning





    # Sampling configuration
    sample-sharding.threshold = 1000 # Sampling threshold, default 1000
    inverse-sampling.rate = 1000 # Inverse of sampling rate, default 1000







    Parameter Description:
    • sample-sharding.threshold: Threshold of estimated number of splits to trigger the sampling sharding strategy
    • inverse-sampling.rate: Inverse of the sampling rate, e.g., 1000 means a sampling rate of 1/1000


    6.3 Snapshot Sharding Configuration





    # Snapshot sharding configuration
    snapshot.split.size = 8096 # Split size, default 8096 rows
    snapshot.fetch.size = 1024 # Fetch size per batch, default 1024 rows







    Parameter Description:
    • snapshot.split.size: Size of table snapshot splits (in rows)
    • snapshot.fetch.size: Maximum fetch size per poll when reading table snapshots


    6.4 Core Configuration Parameters

    snapshot.split.size Integer 8096 Size of table snapshot splits (in rows)
    snapshot.fetch.size Integer 1024 Maximum number of rows fetched per batch when reading snapshots
    chunk-key.even-distribution.factor.upper-bound Double 100.0 Upper bound of the uniform distribution factor
    chunk-key.even-distribution.factor.lower-bound Double 0.05 Lower bound of the uniform distribution factor
    sample-sharding.threshold Integer 1000 Threshold for sampling-based sharding
    inverse-sampling.rate Integer 1000 Inverse of the sampling rate
    server-id String Randomly generated Unique ID for the database client
    server-time-zone String UTC Session time zone of the database server
    connect.timeout.ms Duration 30000 Connection timeout (milliseconds)
    connect.max-retries Integer 3 Maximum number of retries
    connection.pool.size Integer 20 JDBC connection pool size


    6.5 Configuration Example





    source {
    Mysql-CDC {
    # Basic connection configuration
    url = "jdbc:mysql://localhost:3306/test"
    username = "root"
    password = "123456"
    table-names = ["test.user_table"]

    # Snapshot sharding configuration
    snapshot.split.size = 8096
    snapshot.fetch.size = 1024

    # Distribution factor configuration
    chunk-key.even-distribution.factor.upper-bound = 100.0
    chunk-key.even-distribution.factor.lower-bound = 0.05

    # Sampling strategy configuration
    sample-sharding.threshold = 1000
    inverse-sampling.rate = 1000

    # Connection configuration
    server-id = "5400"
    server-time-zone = "Asia/Shanghai"
    connect.timeout.ms = 30000
    connect.max-retries = 3
    connection.pool.size = 20

    # Startup mode
    startup.mode = "initial"

    # Other configurations
    exactly_once = false
    format = "DEFAULT"
    }
    }







    7. Sharding Strategy Control and On-Site Application

    7.1 Summary of Strategy Control Parameters

    By adjusting key parameters, you can precisely control which sharding strategy SeaTunnel uses to cope with different on-site scenarios:


    1. Force Using Uniform Sharding Strategy

    Applicable Scenario: Relatively uniform data distribution, pursuing optimal parallel performance






    source {
    Mysql-CDC {
    url = "jdbc:mysql://localhost:3306/test"
    username = "root"
    password = "123456"
    table-names = ["test.uniform_table"]

    # Force uniform sharding configuration
    chunk-key.even-distribution.factor.upper-bound = 10000.0 # Significantly increase the upper bound
    chunk-key.even-distribution.factor.lower-bound = 0.001 # Significantly decrease the lower bound
    sample-sharding.threshold = 100000 # Extremely high threshold to avoid sampling
    snapshot.split.size = 8096 # Standard split size
    }
    }







    2. Force Using Non-uniform Sharding Strategy

    Applicable Scenario: Uneven data distribution, but the table is not particularly large






    source {
    Mysql-CDC {
    url = "jdbc:mysql://localhost:3306/test"
    username = "root"
    password = "123456"
    table-names = ["test.sparse_table"]

    # Force non-uniform sharding configuration
    chunk-key.even-distribution.factor.upper-bound = 0.1 # Extremely low upper bound
    chunk-key.even-distribution.factor.lower-bound = 0.1 # Extremely low lower bound
    sample-sharding.threshold = 100000 # Extremely high threshold to avoid sampling
    snapshot.split.size = 5000 # Moderate split size
    }
    }







    3. Force Using Sampling-Based Sharding Strategy

    Applicable Scenario: Super large tables requiring efficient sharding






    source {
    Mysql-CDC {
    url = "jdbc:mysql://localhost:3306/test"
    username = "root"
    password = "123456"
    table-names = ["test.huge_table"]

    # Force sampling-based sharding configuration
    chunk-key.even-distribution.factor.upper-bound = 0.01 # Extremely low upper bound
    chunk-key.even-distribution.factor.lower-bound = 0.01 # Extremely low lower bound
    sample-sharding.threshold = 100 # Extremely low threshold to force sampling
    inverse-sampling.rate = 500 # Increase sampling rate
    snapshot.split.size = 10000 # Larger split size
    }
    }







    4. Avoid Sampling Strategy (High Business Database Pressure)

    Applicable Scenario: Large tables but with high business database pressure, cannot use sampling






    source {
    Mysql-CDC {
    url = "jdbc:mysql://localhost:3306/test"
    username = "root"
    password = "123456"
    table-names = ["test.large_table"]

    # Avoid sampling configuration
    chunk-key.even-distribution.factor.upper-bound = 1000.0 # Relax upper bound
    chunk-key.even-distribution.factor.lower-bound = 0.001 # Relax lower bound
    sample-sharding.threshold = 50000 # Extremely high threshold
    snapshot.split.size = 50000 # Large splits to reduce total count
    connection.pool.size = 5 # Reduce connection count
    snapshot.fetch.size = 1024 # Control fetch size
    }
    }







    5. High Parallel Performance Optimization

    Applicable Scenario: Pursuing maximum parallelism and processing speed






    source {
    Mysql-CDC {
    url = "jdbc:mysql://localhost:3306/test"
    username = "root"
    password = "123456"
    table-names = ["test.performance_table"]

    # High parallelism configuration
    snapshot.split.size = 2000 # Small splits to increase parallelism
    snapshot.fetch.size = 2048 # Increase fetch size
    connection.pool.size = 30 # Increase connection pool
    chunk-key.even-distribution.factor.upper-bound = 1000.0 # Prioritize uniform sharding
    sample-sharding.threshold = 10000 # Moderate threshold
    }
    }







    7.2 Parameter Decision Matrix

    Uniform data, pursuing performance 2000-8096 10000.0 0.001 100000 Uniform sharding
    Sparse data, medium table 5000-10000 0.1 0.1 100000 Non-uniform sharding
    Super large table, allowing sampling 10000+ 0.01 0.01 100 Sampling-based sharding
    Large table, high business DB pressure 50000+ 1000.0 0.001 50000 Avoid sampling
    High parallelism requirement 2000 1000.0 0.001 10000 Uniform sharding


    7.3 Explanation of Core Control Parameters

    Strategy Selection Control:

    • chunk-key.even-distribution.factor.upper-bound: Controls whether to use uniform sharding
    • chunk-key.even-distribution.factor.lower-bound: Controls the sensitivity of distribution judgment
    • sample-sharding.threshold: Controls whether to trigger the sampling strategy


    Performance Tuning Control:

    • snapshot.split.size: Controls parallelism and memory usage
    • snapshot.fetch.size: Controls database query pressure
    • connection.pool.size: Controls database connection pressure
    • inverse-sampling.rate: Controls sampling accuracy


    8. Summary

    The table sharding mechanism of the SeaTunnel MySQL CDC connector is implemented through the following core components:

    1. AbstractJdbcSourceChunkSplitter: Core sharding logic
    2. MySqlUtils: MySQL-specific SQL query implementation
    3. JdbcDialect: Database dialect support


    Three sharding strategies:
    • Uniform sharding: Suitable for numeric and date types with uniform data distribution
    • Non-uniform sharding: Suitable for scenarios with sparse data distribution
    • Sampling-based sharding: Suitable for efficient sharding of super large tables


    Decision-making mechanism:
    • Judges data distribution characteristics through the distribution factor
    • Selects appropriate sharding strategies based on table size
    • Supports special handling for various data types


    Through precise parameter control, this mechanism can cope with various complex on-site scenarios, ensuring that MySQL CDC can achieve efficient and balanced data sharding when processing tables of various sizes and types.


    Appendix | Sharding Process




    SeaTunnel Sharding Strategy Decision Parameters

    Sure, here is the translated content in English, formatted as Mermaid code:







    More...
Working...