Kafka MirrorMaker 2.0 安裝教學

由於主流 Helm charts 都有問題,這裡提供實際可用的安裝方案。

🚀 方案一:直接 Kubernetes 部署(最推薦)

這是最簡單可靠的方法,不需要 Helm。

建立 kafka-connect-deploy.yaml

apiVersion: v1
kind: Namespace
metadata:
  name: kafka-mirror
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-connect
  namespace: kafka-mirror
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-connect
  template:
    metadata:
      labels:
        app: kafka-connect
    spec:
      containers:
      - name: kafka-connect
        image: confluentinc/cp-kafka-connect:7.4.0
        ports:
        - containerPort: 8083
        env:
        # === Worker Bootstrap (通常用 target cluster) ===
        - name: CONNECT_BOOTSTRAP_SERVERS
          value: "kafka-v3b.dev.svc.cluster.local:9092"
        - name: CONNECT_REST_ADVERTISED_HOST_NAME
          value: "kafka-connect"
        - name: CONNECT_REST_PORT
          value: "8083"
        - name: CONNECT_GROUP_ID
          value: "mirror-cluster"
        - name: CONNECT_CONFIG_STORAGE_TOPIC
          value: "mm2-configs"
        - name: CONNECT_OFFSET_STORAGE_TOPIC
          value: "mm2-offsets"
        - name: CONNECT_STATUS_STORAGE_TOPIC
          value: "mm2-status"
        - name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
          value: "1"
        - name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
          value: "1"
        - name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
          value: "1"

        # === Converter 設定 ===
        - name: CONNECT_KEY_CONVERTER
          value: "org.apache.kafka.connect.converters.ByteArrayConverter"
        - name: CONNECT_VALUE_CONVERTER
          value: "org.apache.kafka.connect.converters.ByteArrayConverter"
        - name: CONNECT_HEADER_CONVERTER
          value: "org.apache.kafka.connect.converters.ByteArrayConverter"

        # === Plugin path ===
        - name: CONNECT_PLUGIN_PATH
          value: "/usr/share/java,/usr/share/confluent-hub-components"

        # === MirrorMaker v2 Cluster 設定 ===
        - name: CONNECT_MIRROR_SOURCE_CLUSTER_BOOTSTRAP_SERVERS
          value: "kafka-v3.dev.svc.cluster.local:9092"
        - name: CONNECT_MIRROR_TARGET_CLUSTER_BOOTSTRAP_SERVERS
          value: "kafka-v3b.dev.svc.cluster.local:9092"
        - name: CONNECT_HEARTBEAT_ENABLED
          value: "true"
        - name: CONNECT_CHECKPOINT_ENABLED
          value: "true"
        - name: CONNECT_MIRRORMAKER_TASKS_MAX
          value: "10"
        - name: CONNECT_MIRRORMAKER_POLL_TIMEOUT_MS
          value: "5000"

        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1000m"

        livenessProbe:
          httpGet:
            path: /
            port: 8083
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /
            port: 8083
          initialDelaySeconds: 20
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-connect
  namespace: kafka-mirror
spec:
  selector:
    app: kafka-connect
  ports:
  - port: 8083
    targetPort: 8083
  type: ClusterIP

一鍵部署

# 部署
kubectl apply -f kafka-connect-deploy.yaml

# 檢查狀態
kubectl get pods -n kafka-mirror -w

# 等待 Running 狀態,然後檢查日誌
kubectl logs -f deployment/kafka-connect -n kafka-mirror

🎯 方案二:使用第三方 Helm Chart

嘗試 licenseware chart

# 添加 repo
helm repo add licenseware <https://licenseware.github.io/charts/>
helm repo update

# 檢查 chart
helm search repo kafka-connect

# 建立 values
cat > values.yaml << 'EOF'
image:
  repository: confluentinc/cp-kafka-connect
  tag: "7.4.0"

kafka:
  bootstrapServers: "kafka-v3:9092"

connect:
  groupId: "connect-cluster"
  configStorageTopic: "connect-config"
  offsetStorageTopic: "connect-offset"
  statusStorageTopic: "connect-status"

resources:
  requests:
    memory: "1Gi"
    cpu: "500m"
EOF

# 安裝
helm install mm2 licenseware/kafka-connect \\
  --namespace kafka-mirror \\
  --create-namespace \\
  --values values.yaml

🔧 配置 MirrorMaker 2.0

無論使用哪個方案,以下步驟相同:

1. 設置端口轉發

kubectl port-forward svc/kafka-connect 8083:8083 -n kafka-mirror &

2. 等待服務就緒

# 檢查 Kafka Connect 是否就緒
while ! curl -f <http://localhost:8083/> >/dev/null 2>&1; do
  echo "等待 Kafka Connect 啟動..."
  sleep 5
done
echo "Kafka Connect 已就緒!"

3. 建立 MirrorMaker 配置

全新佈署建立 mm2-mirror.json

{
  "name": "mirror-v3-to-v3b",
  "config": {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "tasks.max": "4",

    "topics": ".*",
    "topics.exclude": "^__.*",
    "groups": ".*",

    "source.cluster.alias": "v3",
    "target.cluster.alias": "v3b",
    "source.cluster.bootstrap.servers": "kafka-v3.dev.svc.cluster.local:9092",
    "target.cluster.bootstrap.servers": "kafka-v3b.dev.svc.cluster.local:9092",

    "replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",

    "sync.topic.configs.enabled": "true",
    "sync.topic.acls.enabled": "true",

    "emit.heartbeats.enabled": "true",
    "emit.checkpoints.enabled": "true",
    "emit.heartbeats.interval.seconds": "5",
    "emit.checkpoints.interval.seconds": "60",

    "checkpoints.topic.replication.factor": "1",
    "heartbeats.topic.replication.factor": "1",
    "offset-syncs.topic.replication.factor": "1",
    "offset.syncs.topic.location": "target",

    "refresh.topics.enabled": "true",
    "refresh.topics.interval.seconds": "30",
    "refresh.groups.enabled": "true",
    "refresh.groups.interval.seconds": "30",

    "consumer.auto.offset.reset": "earliest",

    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",

    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true"
  }
}

需要更新的話寫法如下 mm2-mirror.json

{
  "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
  "tasks.max": "4",

  "topics": ".*",
  "topics.exclude": "^__.*",
  "groups": ".*",
  "groups.exclude": "^connect-.*",

  "source.cluster.alias": "v3",
  "target.cluster.alias": "v3b",
  "source.cluster.bootstrap.servers": "kafka-v3.dev.svc.cluster.local:9092",
  "target.cluster.bootstrap.servers": "kafka-v3b.dev.svc.cluster.local:9092",

  "replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",

  "sync.topic.configs.enabled": "true",
  "sync.topic.acls.enabled": "true",

  "emit.heartbeats.enabled": "true",
  "emit.checkpoints.enabled": "true",
  "emit.heartbeats.interval.seconds": "5",
  "emit.checkpoints.interval.seconds": "60",

  "checkpoints.topic.replication.factor": "1",
  "heartbeats.topic.replication.factor": "1",
  "offset-syncs.topic.replication.factor": "1",
  "offset.syncs.topic.location": "target",

  "refresh.topics.enabled": "true",
  "refresh.topics.interval.seconds": "30",
  "refresh.groups.enabled": "true",
  "refresh.groups.interval.seconds": "30",

  "consumer.auto.offset.reset": "earliest",

  "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
  "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
  "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",

  "offset.flush.interval.ms": "60000",
  "offset.lag.max": "-1",

  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",

  "producer.max.request.size": "10485760",
  "producer.buffer.memory": "33554432",
  "producer.batch.size": "16384",
  "producer.compression.type": "gzip",
  "producer.retries": "2147483647",
  "producer.delivery.timeout.ms": "300000",
  "producer.request.timeout.ms": "30000",
  "producer.retry.backoff.ms": "1000"
}
curl -X PUT <http://10.1.20.134:8083/connectors/mirror-v3-to-v3b/config> \\
-H "Content-Type: application/json" \\
-d @/full/path/to/mm2-mirror-put.json

重新佈署:

curl -X DELETE <http://localhost:8083/connectors/mirror-v3-to-v3b>

建立heartbeats connector 建立健康檢查

curl -X POST http://<connect-host>:8083/connectors -H "Content-Type: application/json" -d '{
  "name": "mirror-v3-to-v3b-heartbeats",
  "config": {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "source.cluster.alias": "v3",
    "target.cluster.alias": "v3b",
    "source.cluster.bootstrap.servers": "kafka-v3.dev.svc.cluster.local:9092",
    "target.cluster.bootstrap.servers": "kafka-v3b.dev.svc.cluster.local:9092",
    "emit.heartbeats.interval.seconds": "5",
    "topics": ".*",
    "replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
    "tasks.max": "1"
  }
}'

檢查heartbeats connector

curl http://10.1.20.87:8083/connectors/mirror-v3-to-v3b-heartbeats/tasks curl http://10.1.20.87:8083/connectors/mirror-v3-to-v3b-heartbeats/tasks/0/status kubectl logs kafka-connect-65b64c6cb4-w29ch -n kafka-mirror | grep mirror-v3-to-v3b-heartbeats

部署新的配置執行 MirrorMaker

檢查:

curl <http://10.1.20.134:8083/connectors/mirror-v3-to-v3b/status> | jq '.'

4. 部署 MirrorMaker

# 部署 connector
curl -X POST <http://localhost:8083/connectors> \\
  -H "Content-Type: application/json" \\
  -d @mm2-mirror.json

# 檢查狀態
curl <http://localhost:8083/connectors/mirror-v3-to-v3b/status> | jq '.'

📊 驗證和測試

1. 檢查連接器狀態

# 查看所有連接器
curl <http://localhost:8083/connectors> | jq '.'

# 查看連接器詳細狀態
curl <http://localhost:8083/connectors/mirror-v3-to-v3b/status> | jq '.'

# 查看連接器配置
curl <http://localhost:8083/connectors/mirror-v3-to-v3b/config> | jq '.'

2. 測試資料複製

在源 Kafka 集群建立測試資料:

# 假設您的 kafka-v3 有一個 pod
kubectl exec -it <kafka-v3-pod-name> -- \\
  kafka-console-producer.sh \\
  --topic test-topic \\
  --bootstrap-server localhost:9092

在目標 Kafka 集群檢查複製結果:

# 檢查複製的 topic(注意前綴 source.)
kubectl exec -it <kafka-v3b-pod-name> -- \\
  kafka-topics.sh --list \\
  --bootstrap-server localhost:9092

# 消費複製的訊息
kubectl exec -it <kafka-v3b-pod-name> -- \\
  kafka-console-consumer.sh \\
  --topic source.test-topic \\
  --bootstrap-server localhost:9092 \\
  --from-beginning

🛠️ 管理操作

# 暫停複製
curl -X PUT <http://localhost:8083/connectors/mirror-v3-to-v3b/pause>

# 恢復複製
curl -X PUT <http://localhost:8083/connectors/mirror-v3-to-v3b/resume>

# 重啟連接器
curl -X POST <http://localhost:8083/connectors/mirror-v3-to-v3b/restart>

# 刪除連接器
curl -X DELETE <http://localhost:8083/connectors/mirror-v3-to-v3b>

🧹 清理

# 方案一清理
kubectl delete -f kafka-connect-deploy.yaml

# 方案二清理
helm uninstall mm2 -n kafka-mirror

# 共同清理
kubectl delete namespace kafka-mirror

⚠️ 注意事項

  1. 網路連通性: 確保 Kafka Connect 能夠同時連接到兩個 Kafka 集群
  2. Topic 命名: 複製的 topic 會有前綴 source.
  3. 資源配置: 根據資料量調整 CPU/記憶體
  4. 安全性: 如果 Kafka 有認證,需要在配置中加入相應的安全設定

🎉 成功標誌

如果看到以下輸出,表示安裝成功:

curl <http://localhost:8083/connectors/mirror-v3-to-v3b/status>

應該返回:

{
  "name": "mirror-v3-to-v3b",
  "connector": {
    "state": "RUNNING",
    "worker_id": "kafka-connect:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "kafka-connect:8083"
    }
  ]
}

推薦使用方案一,因為它最穩定且不依賴任何第三方 Helm 倉庫!

DR異常時

需重新安裝mm2及配置connectors 會從頭開始複製

 

By tony

自由軟體愛好者~喜歡不斷的思考各種問題,有新的事物都會想去學習嘗試 做實驗並熱衷研究 沒有所謂頂天的技術 只有謙虛及不斷的學習 精進專業,本站主要以分享系統及網路相關知識、資源而建立。 Github http://stnet253.github.io

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

這個網站採用 Akismet 服務減少垃圾留言。進一步了解 Akismet 如何處理網站訪客的留言資料