內容目錄
Kafka MirrorMaker 2.0 安裝教學
由於主流 Helm charts 都有問題,這裡提供實際可用的安裝方案。
🚀 方案一:直接 Kubernetes 部署(最推薦)
這是最簡單可靠的方法,不需要 Helm。
建立 kafka-connect-deploy.yaml
apiVersion: v1
kind: Namespace
metadata:
name: kafka-mirror
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-connect
namespace: kafka-mirror
spec:
replicas: 1
selector:
matchLabels:
app: kafka-connect
template:
metadata:
labels:
app: kafka-connect
spec:
containers:
- name: kafka-connect
image: confluentinc/cp-kafka-connect:7.4.0
ports:
- containerPort: 8083
env:
# === Worker Bootstrap (通常用 target cluster) ===
- name: CONNECT_BOOTSTRAP_SERVERS
value: "kafka-v3b.dev.svc.cluster.local:9092"
- name: CONNECT_REST_ADVERTISED_HOST_NAME
value: "kafka-connect"
- name: CONNECT_REST_PORT
value: "8083"
- name: CONNECT_GROUP_ID
value: "mirror-cluster"
- name: CONNECT_CONFIG_STORAGE_TOPIC
value: "mm2-configs"
- name: CONNECT_OFFSET_STORAGE_TOPIC
value: "mm2-offsets"
- name: CONNECT_STATUS_STORAGE_TOPIC
value: "mm2-status"
- name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
value: "1"
# === Converter 設定 ===
- name: CONNECT_KEY_CONVERTER
value: "org.apache.kafka.connect.converters.ByteArrayConverter"
- name: CONNECT_VALUE_CONVERTER
value: "org.apache.kafka.connect.converters.ByteArrayConverter"
- name: CONNECT_HEADER_CONVERTER
value: "org.apache.kafka.connect.converters.ByteArrayConverter"
# === Plugin path ===
- name: CONNECT_PLUGIN_PATH
value: "/usr/share/java,/usr/share/confluent-hub-components"
# === MirrorMaker v2 Cluster 設定 ===
- name: CONNECT_MIRROR_SOURCE_CLUSTER_BOOTSTRAP_SERVERS
value: "kafka-v3.dev.svc.cluster.local:9092"
- name: CONNECT_MIRROR_TARGET_CLUSTER_BOOTSTRAP_SERVERS
value: "kafka-v3b.dev.svc.cluster.local:9092"
- name: CONNECT_HEARTBEAT_ENABLED
value: "true"
- name: CONNECT_CHECKPOINT_ENABLED
value: "true"
- name: CONNECT_MIRRORMAKER_TASKS_MAX
value: "10"
- name: CONNECT_MIRRORMAKER_POLL_TIMEOUT_MS
value: "5000"
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /
port: 8083
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /
port: 8083
initialDelaySeconds: 20
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: kafka-connect
namespace: kafka-mirror
spec:
selector:
app: kafka-connect
ports:
- port: 8083
targetPort: 8083
type: ClusterIP
一鍵部署
# 部署
kubectl apply -f kafka-connect-deploy.yaml
# 檢查狀態
kubectl get pods -n kafka-mirror -w
# 等待 Running 狀態,然後檢查日誌
kubectl logs -f deployment/kafka-connect -n kafka-mirror
🎯 方案二:使用第三方 Helm Chart
嘗試 licenseware chart
# 添加 repo
helm repo add licenseware <https://licenseware.github.io/charts/>
helm repo update
# 檢查 chart
helm search repo kafka-connect
# 建立 values
cat > values.yaml << 'EOF'
image:
repository: confluentinc/cp-kafka-connect
tag: "7.4.0"
kafka:
bootstrapServers: "kafka-v3:9092"
connect:
groupId: "connect-cluster"
configStorageTopic: "connect-config"
offsetStorageTopic: "connect-offset"
statusStorageTopic: "connect-status"
resources:
requests:
memory: "1Gi"
cpu: "500m"
EOF
# 安裝
helm install mm2 licenseware/kafka-connect \\
--namespace kafka-mirror \\
--create-namespace \\
--values values.yaml
🔧 配置 MirrorMaker 2.0
無論使用哪個方案,以下步驟相同:
1. 設置端口轉發
kubectl port-forward svc/kafka-connect 8083:8083 -n kafka-mirror &
2. 等待服務就緒
# 檢查 Kafka Connect 是否就緒
while ! curl -f <http://localhost:8083/> >/dev/null 2>&1; do
echo "等待 Kafka Connect 啟動..."
sleep 5
done
echo "Kafka Connect 已就緒!"
3. 建立 MirrorMaker 配置
全新佈署建立 mm2-mirror.json:
{
"name": "mirror-v3-to-v3b",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"tasks.max": "4",
"topics": ".*",
"topics.exclude": "^__.*",
"groups": ".*",
"source.cluster.alias": "v3",
"target.cluster.alias": "v3b",
"source.cluster.bootstrap.servers": "kafka-v3.dev.svc.cluster.local:9092",
"target.cluster.bootstrap.servers": "kafka-v3b.dev.svc.cluster.local:9092",
"replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
"sync.topic.configs.enabled": "true",
"sync.topic.acls.enabled": "true",
"emit.heartbeats.enabled": "true",
"emit.checkpoints.enabled": "true",
"emit.heartbeats.interval.seconds": "5",
"emit.checkpoints.interval.seconds": "60",
"checkpoints.topic.replication.factor": "1",
"heartbeats.topic.replication.factor": "1",
"offset-syncs.topic.replication.factor": "1",
"offset.syncs.topic.location": "target",
"refresh.topics.enabled": "true",
"refresh.topics.interval.seconds": "30",
"refresh.groups.enabled": "true",
"refresh.groups.interval.seconds": "30",
"consumer.auto.offset.reset": "earliest",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
}
需要更新的話寫法如下 mm2-mirror.json:
{
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"tasks.max": "4",
"topics": ".*",
"topics.exclude": "^__.*",
"groups": ".*",
"groups.exclude": "^connect-.*",
"source.cluster.alias": "v3",
"target.cluster.alias": "v3b",
"source.cluster.bootstrap.servers": "kafka-v3.dev.svc.cluster.local:9092",
"target.cluster.bootstrap.servers": "kafka-v3b.dev.svc.cluster.local:9092",
"replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
"sync.topic.configs.enabled": "true",
"sync.topic.acls.enabled": "true",
"emit.heartbeats.enabled": "true",
"emit.checkpoints.enabled": "true",
"emit.heartbeats.interval.seconds": "5",
"emit.checkpoints.interval.seconds": "60",
"checkpoints.topic.replication.factor": "1",
"heartbeats.topic.replication.factor": "1",
"offset-syncs.topic.replication.factor": "1",
"offset.syncs.topic.location": "target",
"refresh.topics.enabled": "true",
"refresh.topics.interval.seconds": "30",
"refresh.groups.enabled": "true",
"refresh.groups.interval.seconds": "30",
"consumer.auto.offset.reset": "earliest",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"offset.flush.interval.ms": "60000",
"offset.lag.max": "-1",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"producer.max.request.size": "10485760",
"producer.buffer.memory": "33554432",
"producer.batch.size": "16384",
"producer.compression.type": "gzip",
"producer.retries": "2147483647",
"producer.delivery.timeout.ms": "300000",
"producer.request.timeout.ms": "30000",
"producer.retry.backoff.ms": "1000"
}
curl -X PUT <http://10.1.20.134:8083/connectors/mirror-v3-to-v3b/config> \\
-H "Content-Type: application/json" \\
-d @/full/path/to/mm2-mirror-put.json
重新佈署:
curl -X DELETE <http://localhost:8083/connectors/mirror-v3-to-v3b>
建立heartbeats connector 建立健康檢查
curl -X POST http://<connect-host>:8083/connectors -H "Content-Type: application/json" -d '{
"name": "mirror-v3-to-v3b-heartbeats",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"source.cluster.alias": "v3",
"target.cluster.alias": "v3b",
"source.cluster.bootstrap.servers": "kafka-v3.dev.svc.cluster.local:9092",
"target.cluster.bootstrap.servers": "kafka-v3b.dev.svc.cluster.local:9092",
"emit.heartbeats.interval.seconds": "5",
"topics": ".*",
"replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
"tasks.max": "1"
}
}'
檢查heartbeats connector
curl http://10.1.20.87:8083/connectors/mirror-v3-to-v3b-heartbeats/tasks curl http://10.1.20.87:8083/connectors/mirror-v3-to-v3b-heartbeats/tasks/0/status kubectl logs kafka-connect-65b64c6cb4-w29ch -n kafka-mirror | grep mirror-v3-to-v3b-heartbeats
部署新的配置執行 MirrorMaker
檢查:
curl <http://10.1.20.134:8083/connectors/mirror-v3-to-v3b/status> | jq '.'
4. 部署 MirrorMaker
# 部署 connector
curl -X POST <http://localhost:8083/connectors> \\
-H "Content-Type: application/json" \\
-d @mm2-mirror.json
# 檢查狀態
curl <http://localhost:8083/connectors/mirror-v3-to-v3b/status> | jq '.'
📊 驗證和測試
1. 檢查連接器狀態
# 查看所有連接器
curl <http://localhost:8083/connectors> | jq '.'
# 查看連接器詳細狀態
curl <http://localhost:8083/connectors/mirror-v3-to-v3b/status> | jq '.'
# 查看連接器配置
curl <http://localhost:8083/connectors/mirror-v3-to-v3b/config> | jq '.'
2. 測試資料複製
在源 Kafka 集群建立測試資料:
# 假設您的 kafka-v3 有一個 pod
kubectl exec -it <kafka-v3-pod-name> -- \\
kafka-console-producer.sh \\
--topic test-topic \\
--bootstrap-server localhost:9092
在目標 Kafka 集群檢查複製結果:
# 檢查複製的 topic(注意前綴 source.)
kubectl exec -it <kafka-v3b-pod-name> -- \\
kafka-topics.sh --list \\
--bootstrap-server localhost:9092
# 消費複製的訊息
kubectl exec -it <kafka-v3b-pod-name> -- \\
kafka-console-consumer.sh \\
--topic source.test-topic \\
--bootstrap-server localhost:9092 \\
--from-beginning
🛠️ 管理操作
# 暫停複製
curl -X PUT <http://localhost:8083/connectors/mirror-v3-to-v3b/pause>
# 恢復複製
curl -X PUT <http://localhost:8083/connectors/mirror-v3-to-v3b/resume>
# 重啟連接器
curl -X POST <http://localhost:8083/connectors/mirror-v3-to-v3b/restart>
# 刪除連接器
curl -X DELETE <http://localhost:8083/connectors/mirror-v3-to-v3b>
🧹 清理
# 方案一清理
kubectl delete -f kafka-connect-deploy.yaml
# 方案二清理
helm uninstall mm2 -n kafka-mirror
# 共同清理
kubectl delete namespace kafka-mirror
⚠️ 注意事項
- 網路連通性: 確保 Kafka Connect 能夠同時連接到兩個 Kafka 集群
- Topic 命名: 複製的 topic 會有前綴
source. - 資源配置: 根據資料量調整 CPU/記憶體
- 安全性: 如果 Kafka 有認證,需要在配置中加入相應的安全設定
🎉 成功標誌
如果看到以下輸出,表示安裝成功:
curl <http://localhost:8083/connectors/mirror-v3-to-v3b/status>
應該返回:
{
"name": "mirror-v3-to-v3b",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
}
]
}
推薦使用方案一,因為它最穩定且不依賴任何第三方 Helm 倉庫!
DR異常時
需重新安裝mm2及配置connectors 會從頭開始複製