카프카 클러스터의 모니터링하는 방법은 대표적으로 2가지가 있다.
첫번째는 JMX 사용하여 모니터링하는 방법이 있고,
두번째는 야후에서 공개한 오픈소스인 Kafka-Manager이다.
이번 글에서는 Kafka-Manager 설치하는 방법에 대해서 알아본다.
1. 카프카 매니저 다운로드
https://github.com/yahoo/CMAK/releases
3.X.X.X 버전은 JAVA 11부터 되는 것 같아서 안전하게 CMAK-1.3.3.23.tar 다운로드하였다.
2. 다운로드 후 압축 해제
$ tar -xzvf CMAK-1.3.3.23.tar.gz
3. sbt 실행
sbt : Simple Build Tool, 스칼라를 위한 빌드 툴
압축해제된 폴더에서 sbt 실행
$ cd CMAK-1.3.3.23/
$ ./sbt clean dist
해당 명령어 실행 후 완료되기까지 시간이 쫌 걸린다!
모두 실행되고 아래와 같이 출력되면 완료
[info] Packaging /mnt/c/kafka/CMAK-1.3.3.23/target/scala-2.11/kafka-manager_2.11-1.3.3.23-sans-externalized.jar ...
[info] Done packaging.
[info]
[info] Your package is ready in /mnt/c/kafka/CMAK-1.3.3.23/target/universal/kafka-manager-1.3.3.23.zip
[info] [success] Total time: 633 s, completed Apr 21, 2020 10:58:27 AM
4. kafka-manager-1.3.3.23.zip 파일 압축 해제
sbt 실행 후 만들어진 압축폴더를 적당한 위치에 압축을 풀어준다.
압축폴더의 경로는 sbt 실행후 성공했다는 로그를 참고하면 된다
로그
[info] Your package is ready in /mnt/c/kafka/CMAK-1.3.3.23/target/universal/kafka-manager-1.3.3.23.zip
압축 해제 실행
$ unzip -d kafka kafka-manager-1.3.3.23.zip
5. config/application.conf 파일 수정
압축을 풀고, 해당 폴더 안에서 config/application.conf 파일을 수정한다.
수정 항목은 zookeeper 정보 입력!
자세한 것은 CMAK git hub 참고
application.conf 편집
$ vi conf/application.conf
kafka-manager.zkhosts의 URL 수정
# ~~~~~
# Secret key
# ~~~~~
# The secret key is used to secure cryptographics functions.
# If you deploy your application to several instances be sure to use the same key!
play.crypto.secret="^<csmm5Fx4d=r2HEX8pelM3iBkFVv?k[mc;IZE<_Qoq8EkX_/7@Zt6dP05Pzea3U"
play.crypto.secret=${?APPLICATION_SECRET}
# The application languages
# ~~~~~
play.i18n.langs=["en"]
play.http.requestHandler = "play.http.DefaultHttpRequestHandler"
play.http.context = "/"
play.application.loader=loader.KafkaManagerLoader
kafka-manager.zkhosts="127.0.0.1:2181"
kafka-manager.zkhosts=${?ZK_HOSTS}
pinned-dispatcher.type="PinnedDispatcher"
pinned-dispatcher.executor="thread-pool-executor"
application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
}
akka.logger-startup-timeout = 60s
basicAuthentication.enabled=false
basicAuthentication.enabled=${?KAFKA_MANAGER_AUTH_ENABLED}
basicAuthentication.ldap.enabled=false
basicAuthentication.ldap.enabled=${?KAFKA_MANAGER_LDAP_ENABLED}
basicAuthentication.ldap.server=""
basicAuthentication.ldap.server=${?KAFKA_MANAGER_LDAP_SERVER}
basicAuthentication.ldap.port=389
basicAuthentication.ldap.port=${?KAFKA_MANAGER_LDAP_PORT}
basicAuthentication.ldap.username=""
수정 사항 저장
6. kafka-server-start.sh 에서 JMX_PORT 추가
아래와 같이 kafka-server-start.sh 파일에 'export JMX_PORT=9999' 추가
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# add JAM_PORT
export JMX_PORT=9999
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
....................
7. kafka-manager 실행
bin폴더 안에 kafka-manager 실행
$ bin/kafka-manager
정상적으로 실행되면 아래와 같이 출력된다.
[info] o.a.z.ZooKeeper - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@10452de8
[info] k.m.a.KafkaManagerActor - zk=127.0.0.1:2181
[info] k.m.a.KafkaManagerActor - baseZkPath=/kafka-manager
[info] o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
[info] o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session
[info] o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x100031c7e090008, negotiated timeout = 60000
[info] k.m.a.DeleteClusterActor - Started actor akka://kafka-manager-system/user/kafka-manager/delete-cluster
[info] k.m.a.DeleteClusterActor - Starting delete clusters path cache...
[info] k.m.a.KafkaManagerActor - Started actor akka://kafka-manager-system/user/kafka-manager
[info] k.m.a.KafkaManagerActor - Starting delete clusters path cache...
[info] k.m.a.KafkaManagerActor - Starting kafka manager path cache...
[info] k.m.a.DeleteClusterActor - Adding kafka manager path cache listener...
[info] k.m.a.DeleteClusterActor - Scheduling updater for 10 seconds
[info] k.m.a.KafkaManagerActor - Adding kafka manager path cache listener...
[info] play.api.Play - Application started (Prod)
[info] p.c.s.NettyServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000
[info] k.m.a.KafkaManagerActor - Updating internal state...
[info] k.m.a.KafkaManagerActor - Updating internal state...
[info] k.m.a.KafkaManagerActor - Updating internal state...
8. 웹 브라우저에서 카프카 매니저 확인
config/application.conf 에서 URL을 127.0.0.1:9000 으로 설정했으므로
웹 사이트에 해당 주소를 입력하고 접속한다.
아직 클러스를 동록하지 않아 Cluster list에는 아무것도 없다.
kafka manager 클러스터 등록은 여기 참조.
'Programming > Kafka' 카테고리의 다른 글
[kafka] docker kafka image (1) | 2020.04.29 |
---|---|
[kafka] 자주 사용하는카프카 명령어 (2) | 2020.04.23 |
[kafka] Consumer Connection Config (0) | 2020.04.22 |