Kafka 4.2.0 on Kubernetes - Complete Setup Guide - Exposed to Internet

Collapse
X
 
  • Time
  • Show
Clear All
new posts
  • MyrinNew
    Senior Member
    • Feb 2024
    • 5175

    #1

    Kafka 4.2.0 on Kubernetes - Complete Setup Guide - Exposed to Internet

    3-broker Kafka cluster on k3s with KRaft, SASL/SCRAM, and external access via Traefik.


    Before you start — get your Traefik IP:






    kubectl get svc traefik -n kube-system







    Copy any IP from the EXTERNAL-IP column. Replace every occurrence of

    192.168.1.119 in the YAMLs below with your actual IP.



    Stage 1 — Bootstrap

    Apply all of these files in order. This stage starts the cluster with port 9094

    open (no auth) so you can register SCRAM credentials.



    1.1 namespace.yaml




    apiVersion: v1
    kind: Namespace
    metadata:
    name: kafka









    kubectl apply -f namespace.yaml







    1.2 kafka-jaas.yaml




    apiVersion: v1
    kind: ConfigMap
    metadata:
    name: kafka-jaas
    namespace: kafka
    data:
    jaas.conf: |
    KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginM odule required
    username="admin"
    password="supersecret";
    };









    kubectl apply -f kafka-jaas.yaml







    1.3 kafka-sasl.yaml




    apiVersion: v1
    kind: Secret
    metadata:
    name: kafka-sasl
    namespace: kafka
    type: Opaque
    stringData:
    username: admin
    password: supersecret









    kubectl apply -f kafka-sasl.yaml







    1.4 traefik-config.yaml

    Opens three TCP entrypoints on Traefik — one per broker.






    apiVersion: helm.cattle.io/v1
    kind: HelmChartConfig
    metadata:
    name: traefik
    namespace: kube-system
    spec:
    valuesContent: |-
    ports:
    kafka-0:
    port: 9092
    expose:
    default: true
    exposedPort: 9092
    protocol: TCP
    kafka-1:
    port: 9093
    expose:
    default: true
    exposedPort: 9093
    protocol: TCP
    kafka-2:
    port: 9094
    expose:
    default: true
    exposedPort: 9094
    protocol: TCP











    kubectl apply -f traefik-config.yaml
    kubectl rollout status deployment/traefik -n kube-system







    Verify ports appeared:






    kubectl get svc traefik -n kube-system
    # PORT(S) should include 9092, 9093, 9094 alongside 80 and 443










    1.5 kafka-traefik.yaml

    Headless service for internal pod DNS, one ClusterIP service per broker

    (Traefik v3 routes to port 9095 on each pod), and IngressRouteTCP rules.






    apiVersion: v1
    kind: Service
    metadata:
    name: kafka-headless
    namespace: kafka
    spec:
    clusterIP: None
    selector:
    app: kafka
    ports:
    - name: internal
    port: 9092
    - name: controller
    port: 9093
    - name: plaintext-bootstrap
    port: 9094

    ---
    apiVersion: v1
    kind: Service
    metadata:
    name: kafka-0-external
    namespace: kafka
    spec:
    type: ClusterIP
    selector:
    app: kafka
    statefulset.kubernetes.io/pod-name: kafka-0
    ports:
    - name: external
    port: 9092
    targetPort: 9095

    ---
    apiVersion: v1
    kind: Service
    metadata:
    name: kafka-1-external
    namespace: kafka
    spec:
    type: ClusterIP
    selector:
    app: kafka
    statefulset.kubernetes.io/pod-name: kafka-1
    ports:
    - name: external
    port: 9092
    targetPort: 9095

    ---
    apiVersion: v1
    kind: Service
    metadata:
    name: kafka-2-external
    namespace: kafka
    spec:
    type: ClusterIP
    selector:
    app: kafka
    statefulset.kubernetes.io/pod-name: kafka-2
    ports:
    - name: external
    port: 9092
    targetPort: 9095

    ---
    apiVersion: traefik.io/v1alpha1
    kind: IngressRouteTCP
    metadata:
    name: kafka-0-tcp
    namespace: kafka
    spec:
    entryPoints:
    - kafka-0
    routes:
    - match: HostSNI(`*`)
    services:
    - name: kafka-0-external
    port: 9092

    ---
    apiVersion: traefik.io/v1alpha1
    kind: IngressRouteTCP
    metadata:
    name: kafka-1-tcp
    namespace: kafka
    spec:
    entryPoints:
    - kafka-1
    routes:
    - match: HostSNI(`*`)
    services:
    - name: kafka-1-external
    port: 9092

    ---
    apiVersion: traefik.io/v1alpha1
    kind: IngressRouteTCP
    metadata:
    name: kafka-2-tcp
    namespace: kafka
    spec:
    entryPoints:
    - kafka-2
    routes:
    - match: HostSNI(`*`)
    services:
    - name: kafka-2-external
    port: 9092











    kubectl apply -f kafka-traefik.yaml










    1.6 kafka-stateful-bootstrap.yaml


    ⚠️ Replace 192.168.1.119 with your Traefik IP before applying.


    Includes:
    • EXTERNAL listener on port 9095 — Traefik routes external traffic here
    • PLAINTEXT listener on port 9094 — temporary, no auth, used to register SCRAM credentials
    • init container calculates the external port per broker: kafka-0 → 9092, kafka-1 → 9093, kafka-2 → 9094




    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
    name: kafka
    namespace: kafka
    spec:
    serviceName: kafka-headless
    replicas: 3
    selector:
    matchLabels:
    app: kafka
    template:
    metadata:
    labels:
    app: kafka
    spec:
    securityContext:
    fsGroup: 1001
    volumes:
    - name: kafka-config
    emptyDir: {}
    - name: kafka-jaas
    configMap:
    name: kafka-jaas
    initContainers:
    - name: init-node-id
    image: busybox:1.36
    command:
    - sh
    - -c
    - |
    ORDINAL=$(hostname | awk -F'-' '{print $NF}')
    echo "$ORDINAL" > /config/node-id
    EXTERNAL_PORT=$((9092 + ORDINAL))
    echo "$EXTERNAL_PORT" > /config/external-port
    volumeMounts:
    - name: kafka-config
    mountPath: /config
    - name: format-storage
    image: apache/kafka:4.2.0
    command:
    - sh
    - -c
    - |
    NODE_ID=$(cat /config/node-id)
    if [ ! -f "/data/meta.properties" ]; then
    echo "Formatting storage for node $NODE_ID..."
    echo "node.id=$NODE_ID" > /tmp/kraft.properties
    echo "process.roles=broker,controller" >> /tmp/kraft.properties
    echo "controller.quorum.voters=0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093" >> /tmp/kraft.properties
    echo "listeners=PLAINTEXT://:9092,CONTROLLER://:9093" >> /tmp/kraft.properties
    echo "advertised.listeners=PLAINTEXT://localhost:9092" >> /tmp/kraft.properties
    echo "listener.security.protocol.map=PLAINTEXT:PLAINTEX T,CONTROLLER:PLAINTEXT" >> /tmp/kraft.properties
    echo "inter.broker.listener.name=PLAINTEXT" >> /tmp/kraft.properties
    echo "controller.listener.names=CONTROLLER" >> /tmp/kraft.properties
    echo "log.dirs=/data" >> /tmp/kraft.properties
    /opt/kafka/bin/kafka-storage.sh format \
    --ignore-formatted \
    --cluster-id q1Sh-9_ISia_zwGINzRvyQ \
    --config /tmp/kraft.properties
    else
    echo "Already formatted, skipping."
    fi
    volumeMounts:
    - name: kafka-data
    mountPath: /data
    - name: kafka-config
    mountPath: /config
    containers:
    - name: kafka
    image: apache/kafka:4.2.0
    command:
    - sh
    - -c
    - |
    export KAFKA_NODE_ID=$(cat /config/node-id)
    export EXTERNAL_PORT=$(cat /config/external-port)
    export KAFKA_ADVERTISED_LISTENERS="INTERNAL://$(POD_NAME).kafka-headless.kafka.svc.cluster.local:9092,EXTERNAL://192.168.1.119:${EXTERNAL_PORT},PLAINTEXT://$(POD_NAME).kafka-headless.kafka.svc.cluster.local:9094"
    exec /etc/kafka/docker/run
    ports:
    - containerPort: 9092
    - containerPort: 9093
    - containerPort: 9094
    - containerPort: 9095
    env:
    - name: CLUSTER_ID
    value: "q1Sh-9_ISia_zwGINzRvyQ"
    - name: KAFKA_PROCESS_ROLES
    value: "broker,controller"
    - name: KAFKA_CONTROLLER_LISTENER_NAMES
    value: "CONTROLLER"
    - name: KAFKA_CONTROLLER_QUORUM_VOTERS
    value: "0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093"
    - name: KAFKA_LISTENERS
    value: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9095,PLAINTEXT://:9094"
    - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
    value: "INTERNAL:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTE RNAL:SASL_PLAINTEXT,PLAINTEXT:PLAINTEXT"
    - name: KAFKA_INTER_BROKER_LISTENER_NAME
    value: "INTERNAL"
    - name: POD_NAME
    valueFrom:
    fieldRef:
    fieldPath: metadata.name
    - name: KAFKA_SASL_ENABLED_MECHANISMS
    value: SCRAM-SHA-512
    - name: KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL
    value: SCRAM-SHA-512
    - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
    value: "3"
    - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
    value: "3"
    - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
    value: "2"
    - name: KAFKA_MIN_INSYNC_REPLICAS
    value: "2"
    - name: KAFKA_LOG_DIRS
    value: /data
    - name: KAFKA_OPTS
    value: "-Djava.security.auth.login.config=/opt/kafka/config/jaas/jaas.conf"
    volumeMounts:
    - name: kafka-data
    mountPath: /data
    - name: kafka-config
    mountPath: /config
    - name: kafka-jaas
    mountPath: /opt/kafka/config/jaas
    resources:
    requests:
    cpu: 500m
    memory: 1Gi
    limits:
    cpu: "2"
    memory: 4Gi
    volumeClaimTemplates:
    - metadata:
    name: kafka-data
    spec:
    accessModes: ["ReadWriteOnce"]
    resources:
    requests:
    storage: 10Gi











    kubectl apply -f kafka-stateful-bootstrap.yaml
    kubectl rollout status statefulset/kafka -n kafka










    1.7 Register SCRAM credentials

    Port 9094 is open and unauthenticated. Use it to write the admin user into

    Kafka's metadata store. This is a one-time operation.






    kubectl exec -n kafka kafka-0 -- \
    /opt/kafka/bin/kafka-configs.sh \
    --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9094 \
    --alter \
    --add-config 'SCRAM-SHA-512=[password=supersecret]' \
    --entity-type users \
    --entity-name admin







    Expected output:






    Completed updating config for user admin.







    To add more users (application service accounts), repeat the command with

    different --entity-name and password values while port 9094 is still open.



    Stage 2 — Production (close port 9094)

    Port 9094 is now unnecessary and a security risk. Apply the final StatefulSet

    to remove it. No other files change.



    2.1 kafka-stateful-final.yaml


    ⚠️ Replace 192.168.1.119 with your Traefik IP before applying.


    Identical to the bootstrap version except:
    • PLAINTEXT://:9094 removed from KAFKA_LISTENERS
    • PLAINTEXT:PLAINTEXT removed from KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
    • PLAINTEXT entry removed from KAFKA_ADVERTISED_LISTENERS
    • containerPort: 9094 removed



    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
    name: kafka
    namespace: kafka
    spec:
    serviceName: kafka-headless
    replicas: 3
    selector:
    matchLabels:
    app: kafka
    template:
    metadata:
    labels:
    app: kafka
    spec:
    securityContext:
    fsGroup: 1001
    volumes:
    - name: kafka-config
    emptyDir: {}
    - name: kafka-jaas
    configMap:
    name: kafka-jaas
    initContainers:
    - name: init-node-id
    image: busybox:1.36
    command:
    - sh
    - -c
    - |
    ORDINAL=$(hostname | awk -F'-' '{print $NF}')
    echo "$ORDINAL" > /config/node-id
    EXTERNAL_PORT=$((9092 + ORDINAL))
    echo "$EXTERNAL_PORT" > /config/external-port
    volumeMounts:
    - name: kafka-config
    mountPath: /config
    - name: format-storage
    image: apache/kafka:4.2.0
    command:
    - sh
    - -c
    - |
    NODE_ID=$(cat /config/node-id)
    if [ ! -f "/data/meta.properties" ]; then
    echo "Formatting storage for node $NODE_ID..."
    echo "node.id=$NODE_ID" > /tmp/kraft.properties
    echo "process.roles=broker,controller" >> /tmp/kraft.properties
    echo "controller.quorum.voters=0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093" >> /tmp/kraft.properties
    echo "listeners=PLAINTEXT://:9092,CONTROLLER://:9093" >> /tmp/kraft.properties
    echo "advertised.listeners=PLAINTEXT://localhost:9092" >> /tmp/kraft.properties
    echo "listener.security.protocol.map=PLAINTEXT:PLAINTEX T,CONTROLLER:PLAINTEXT" >> /tmp/kraft.properties
    echo "inter.broker.listener.name=PLAINTEXT" >> /tmp/kraft.properties
    echo "controller.listener.names=CONTROLLER" >> /tmp/kraft.properties
    echo "log.dirs=/data" >> /tmp/kraft.properties
    /opt/kafka/bin/kafka-storage.sh format \
    --ignore-formatted \
    --cluster-id q1Sh-9_ISia_zwGINzRvyQ \
    --config /tmp/kraft.properties
    else
    echo "Already formatted, skipping."
    fi
    volumeMounts:
    - name: kafka-data
    mountPath: /data
    - name: kafka-config
    mountPath: /config
    containers:
    - name: kafka
    image: apache/kafka:4.2.0
    command:
    - sh
    - -c
    - |
    export KAFKA_NODE_ID=$(cat /config/node-id)
    export EXTERNAL_PORT=$(cat /config/external-port)
    export KAFKA_ADVERTISED_LISTENERS="INTERNAL://$(POD_NAME).kafka-headless.kafka.svc.cluster.local:9092,EXTERNAL://192.168.1.119:${EXTERNAL_PORT}"
    exec /etc/kafka/docker/run
    ports:
    - containerPort: 9092
    - containerPort: 9093
    - containerPort: 9095
    env:
    - name: CLUSTER_ID
    value: "q1Sh-9_ISia_zwGINzRvyQ"
    - name: KAFKA_PROCESS_ROLES
    value: "broker,controller"
    - name: KAFKA_CONTROLLER_LISTENER_NAMES
    value: "CONTROLLER"
    - name: KAFKA_CONTROLLER_QUORUM_VOTERS
    value: "0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093"
    - name: KAFKA_LISTENERS
    value: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9095"
    - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
    value: "INTERNAL:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTE RNAL:SASL_PLAINTEXT"
    - name: KAFKA_INTER_BROKER_LISTENER_NAME
    value: "INTERNAL"
    - name: POD_NAME
    valueFrom:
    fieldRef:
    fieldPath: metadata.name
    - name: KAFKA_SASL_ENABLED_MECHANISMS
    value: SCRAM-SHA-512
    - name: KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL
    value: SCRAM-SHA-512
    - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
    value: "3"
    - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
    value: "3"
    - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
    value: "2"
    - name: KAFKA_MIN_INSYNC_REPLICAS
    value: "2"
    - name: KAFKA_LOG_DIRS
    value: /data
    - name: KAFKA_OPTS
    value: "-Djava.security.auth.login.config=/opt/kafka/config/jaas/jaas.conf"
    volumeMounts:
    - name: kafka-data
    mountPath: /data
    - name: kafka-config
    mountPath: /config
    - name: kafka-jaas
    mountPath: /opt/kafka/config/jaas
    resources:
    requests:
    cpu: 500m
    memory: 1Gi
    limits:
    cpu: "2"
    memory: 4Gi
    volumeClaimTemplates:
    - metadata:
    name: kafka-data
    spec:
    accessModes: ["ReadWriteOnce"]
    resources:
    requests:
    storage: 10Gi









    kubectl apply -f kafka-stateful-final.yaml
    kubectl rollout status statefulset/kafka -n kafka







    2.2 Verify




    kubectl exec -n kafka kafka-0 -- \
    /opt/kafka/bin/kafka-metadata-quorum.sh \
    --bootstrap-controller kafka-0.kafka-headless.kafka.svc.cluster.local:9093 \
    describe --status






    Healthy output:






    LeaderId: 0
    MaxFollowerLag: 0
    MaxFollowerLagTimeMs: 0
    CurrentVoters: [{"id": 0, ...}, {"id": 1, ...}, {"id": 2, ...}]
    CurrentObservers: []










    2.3 Python client





    pip install kafka-python-ng











    from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
    from kafka.admin import NewTopic
    from kafka.errors import TopicAlreadyExistsError
    import json, time

    # Replace 192.168.1.119 with your Traefik IP
    BOOTSTRAP_SERVERS = [
    "192.168.1.119:9092", # kafka-0
    "192.168.1.119:9093", # kafka-1
    "192.168.1.119:9094", # kafka-2
    ]

    SASL_CONFIG = {
    "security_protocol": "SASL_PLAINTEXT",
    "sasl_mechanism": "SCRAM-SHA-512",
    "sasl_plain_username": "admin",
    "sasl_plain_password": "supersecret",
    }

    TOPIC = "orders"


    def create_topic(topic):
    admin = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVE RS, **SASL_CONFIG)
    try:
    admin.create_topics([
    NewTopic(name=topic, num_partitions=3, replication_factor=3)
    ])
    print(f"Topic '{topic}' created.")
    except TopicAlreadyExistsError:
    print(f"Topic '{topic}' already exists.")
    finally:
    admin.close()


    def produce(topic=TOPIC, count=10):
    producer = KafkaProducer(
    bootstrap_servers=BOOTSTRAP_SERVERS,
    **SASL_CONFIG,
    value_serializer=lambda v: json.dumps(v).encode(),
    key_serializer=lambda k: k.encode() if k else None,
    acks="all",
    )
    for i in range(count):
    meta = producer.send(
    topic,
    key=str(i),
    value={"id": i, "ts": time.time()}
    ).get(timeout=10)
    print(f" sent [{i}] partition={meta.partition} offset={meta.offset}")
    producer.flush()
    producer.close()


    def consume(topic=TOPIC):
    consumer = KafkaConsumer(
    topic,
    bootstrap_servers=BOOTSTRAP_SERVERS,
    **SASL_CONFIG,
    value_deserializer=lambda v: json.loads(v.decode()),
    key_deserializer=lambda k: k.decode() if k else None,
    group_id="my-group",
    auto_offset_reset="earliest",
    consumer_timeout_ms=5000,
    )
    for msg in consumer:
    print(
    f" recv key={msg.key} "
    f"partition={msg.partition} "
    f"offset={msg.offset} "
    f"value={msg.value}"
    )
    consumer.close()


    if __name__ == "__main__":
    create_topic(TOPIC)
    produce()
    consume()










    Troubleshooting

    Pods not starting






    kubectl logs -n kafka kafka-0 -c format-storage
    kubectl logs -n kafka kafka-0 -c init-node-id







    SCRAM registration times out


    Port 9094 is not reachable. Check the headless service includes it:






    kubectl get svc kafka-headless -n kafka







    Python client connects but gets wrong broker addresses


    The Traefik IP in the StatefulSet command block is wrong. Fix it and roll:






    kubectl rollout restart statefulset/kafka -n kafka







    Traefik ports not appearing






    kubectl rollout restart deployment/traefik -n kube-system
    kubectl get svc traefik -n kube-system









    More...
Working...