0. 환경
- CentOS7
- JDK 11(Java 11)
- Apache Kafka 3.0.0
1. kafka-topics.sh
- Topic과 관련된 명령을 실행할 수 있는 쉘 스크립트 파일입니다.
- Topic이란 RDBMS에서 사용하는 테이블과 비슷하다고 볼 수 있습니다.
토픽 생성
- --create 옵션을 사용하여 Topic을 생성합니다.
- --bootstrap-server는 토픽을 생성할 카프카 클러스터를 구성하는 브로커들의 IP와 port를 작성합니다.
- --replication-factor는 Topic의 파티션을 복제할 복제 개수를 작성합니다.(2이면 1개의 복제본을 사용한다는 의미)
- --partitions는 파티션의 개수를 지정합니다.
- --config는 추가적인 설정을 할 수 있습니다.(retention.ms는 데이터 유지기간(ms) 86400000ms = 1일
- replication 1개, partition 1개로 kafka-test라는 토픽을 생성합니다.
bin/kafka-topics.sh \
--create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--config retention.ms=86400000 \
--topic kafka-test
토픽 리스트 조회
- --list 옵션을 사용하여 Topic 리스트를 조회할 수 있습니다.
- --exclude-internal는 Kafka 내부 관리를 위한 Topic을 조회 대상에서 제외 합니다.
bin/kafka-topics.sh \
--list \
--bootstrap-server localhost:9092 \
--exclude-internal
토픽 상세 조회
- --describe 옵션을 사용하여 토픽의 상세내역을 확인할 수 있습니다.
- 파티션 개수, 복제된 파티션이 위치한 브로커의 번호, 기타 토픽을 구성하는 설정들을 출력합니다.
- 토픽 별 토픽명, 존재하는 파티션 번호, 리더 브로커 번호, 복제 계수, ISR
- 토픽의 리더 파티션이 일부 브로커에 몰려있는 경우 카프카 클러스터 부하가 특정 브로커들로만 몰릴 수 있으므로 성능 확인을 위해 상세 내역을 조회해 봅니다.
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--describe
Topic: kafka-test TopicId: dIgRmdtJQpqv5y3s6cJE6w PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: kafka-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: kafka-test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: kafka-test Partition: 2 Leader: 0 Replicas: 0 Isr: 0
토픽 옵션 수정
- 파티션 개수를 변경하려면 kafka-topics.sh를 사용해야 합니다.
- 토픽 옵션 중 Dynamic topic config(log.segment.bytes, log.retention.ms 등)는 kafka-configs.sh를 통해 수정할 수 있습니다.
- 주의할 점은 토픽의 파티션 수는 증가만 가능하고, 감소는 불가능합니다. (파티션의 수만큼 컨슈머 역시 추가해줘야 합니다.)
[partition 개수를 3개로 늘리기]
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--alter \
--partitions 3
[설정 값 조회]
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--describe
[retention.ms를 1일로 변경하기]
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name kafka-test \
--alter \
--add-config retention.ms=86400000
[retention.ms 옵션 삭제]
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name kafka-test \
--alter \
--delete-config retention.ms
[설정 값 조회]
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name kafka-test \
--describe
토픽 삭제
bin/kafka-topics.sh \
--delete \
--bootstrap-server localhost:9092 \
--topic kafka-test
★ Topic 삭제가 안될 경우
$ zookeeper-shell localhost:2181
ls /brokers/topics
토픽이름 토픽2 토픽3
deleteall /brokers/topics/test
ls /brokers/topics
2. kafka-console-producer.sh
- 생성된 토픽에 데이터를 넣을 수 있는 console producer 실행 명령어입니다.
- 토픽에 넣는 데이터는 레코드(record)라고 부르며 키(key)와 값(value)으로 이루어져 있습니다.
- 테스트 목적으로 String 타입으로만 브로커로 전송할 수 있습니다.
- 전송되는 레코드 값은 UTF-8을 기반으로 Byte로 변환되고 ByteArraySerializer로만 직렬화됩니다.
- Producer client 라이브러리를 이용해 직접 개발하여 튜닝할 수도 있습니다.
[value 형태의 데이터를 입력(key는 null로 입력됩니다.)]
bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic kafka-test
[key, value 형태의 데이터를 입력]
bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--property "parse.key=true" \
--property "key.separator=:"
3. kafka-console-consumer.sh
- 브로커에 전송된 데이터를 폴링(polling)하여 데이터를 확인할 수 있는 console consumer 명령어입니다.
- --from-beginning 옵션을 사용하면 가장 처음 offset 데이터부터 폴링(polling)합니다
[value 형태로 데이터를 받음]
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--from-beginning
[key-value 형태로 데이터를 받음]
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--property print.key=true \
--property key.separator="-" \
--from-beginning
[group을 생성하여 데이터를 받음 따라서 파티션 개수에 따라 데이터 순서가 보장되지 않을수도 있습니다.]
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--group testgroup \
--from-beginning
[토픽의 특정 파티션 폴링]
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--from-beginning \
--partition 0 \
--topic kafka-test
4. kafka-consumer-groups.sh
- 컨슈머 그룹은 따로 생성하는 명령어 필요 없이 컨슈머 동작 시 컨슈머 그룹명을 지정하면 새로 생성됩니다.
- 그룹 정보는 kafka-consumer-groups.sh 명령어를 이용해 확인할 수 있습니다.
컨슈머 그룹 리스트 조회
- --list 옵션을 사용하여 그룹의 리스트를 확인 할 수 있습니다.
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
그룹 상세 내용 조회
- --decribe 옵션을 사용해 특정 그룹에 대한 상세 내용을 조회합니다.
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group testgroup \
--describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testgroup kafka-test 1 259 259 0 - - -
testgroup kafka-test 0 270 270 0 - - -
testgroup kafka-test 2 301 301 0 - - -
그룹 삭제
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--delete \
-group testgroup
토픽 오프셋 리셋
--to-earliest (가장 낮은 숫자의 오프셋)
파티션 번호와 특정 오프셋을 지정하여 리셋도 가능합니다.
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group testgroup \
--topic kafka-test \
--reset-offsets \
--to-earliest \
--execute
GROUP TOPIC PARTITION NEW-OFFSET
testgroup kafka-test 0 0
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group testgroup \
--topic kafka-test:0 \
--reset-offsets \
--to-offset 9 \
--execute
GROUP TOPIC PARTITION NEW-OFFSET
testgroup kafka-test 0 9
5. kafka-verifiable-producer
- 네트워크 통신 테스트할 때 주로 사용하는 명령어입니다.(Producer)
- --max-messages 옵션을 사용해 보낼 데이터 개수를 지정할 수 있습니다. (-1을 사용하면 종료될 때까지 계속 데이터를 보냅니다.)
- 마지막 결과 줄을 보면 통계 값이 출력되는데 평균 처리량을 확인할 수 있습니다.
bin/kafka-verifiable-producer.sh \
--bootstrap-server localhost:9092 \
--max-messages 100 \
--topic kafka-test
{"timestamp":1641996958595,"name":"startup_complete"}
{"timestamp":1641996958864,"name":"producer_send_success","key":null,"value":"0","offset":359,"topic":"kafka-test","partition":1}
. . . (생략)
{"timestamp":1641996958887,"name":"shutdown_complete"}
{"timestamp":1641996958888,"name":"tool_data","sent":100,"acked":100,"target_throughput":-1,"avg_throughput":323.62459546925567}
6. kafka-verifiable-consumer
- 네트워크 통신 테스트할 때 주로 사용하는 명령어입니다.(Consumer)
- 결괏값을 보면 verifiable Producer가 보낸 데이터를 정상적으로 받았는지 그리고 offset 커밋 여부를 확인할 수 있습니다.
bin/kafka-verifiable-consumer.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--group-id testgroup
{"timestamp":1641996994729,"name":"startup_complete"}
{"timestamp":1641996995077,"name":"partitions_assigned","partitions":[{"topic":"kafka-test","partition":1},{"topic":"kafka-test","partition":2},{"topic":"kafka-test","partition":0}]}
{"timestamp":1641996995133,"name":"records_consumed","count":100,"partitions":[{"topic":"kafka-test","partition":1,"count":100,"minOffset":359,"maxOffset":458}]}
{"timestamp":1641996995139,"name":"offsets_committed","offsets":[{"topic":"kafka-test","partition":1,"offset":459}],"success":true}
7. kafka-delete-records.sh
- 토픽에 저장된 데이터를 지울 수 있는 쉘 스크립트입니다.
- 가장 오래된 데이터부터 특정 시점의 오프셋 데이터까지 삭제할 수 있습니다.
- json 파일을 생성 후 명령어를 이용해 삭제 할 수 있습니다.
- "topic": "[삭제할 레코드가 있는 토픽명]", "partition": [삭제할 레코드가 있는 파티션 번호], "offset": [처음부터 삭제할 offset 번호]
- "offset": 4 -> offset 0 ~ 3 삭제
[1. json 파일을 생성 및 작성합니다.]
vi delete-topic.json
{
"partitions": [
{
"topic": "kafka-test",
"partition": 0,
"offset": 4
}
],
"version": 1
}
[2. 명령어를 사용해서 레코드를 삭제합니다.]
bin/kafka-delete-records.sh \
--bootstrap-server my-kafka:9092 \
--offset-json-file delete-topic.json
'Backend > Kafka' 카테고리의 다른 글
[Kafka] 카프카 Burrow 설치 (카프카 모니터링) (0) | 2022.01.29 |
---|---|
[Spring for Apache Kafka] Consumer Mysql Insert (0) | 2022.01.11 |
[Spring for Apache Kafka] Error while fetching metadata with correlation id (0) | 2022.01.10 |
[Spring for Apache Kafka] Apache Kafka Producer (0) | 2022.01.09 |
[Kafka] Linux Kafka 설치 (Apache Kafka 3.0.0) (0) | 2022.01.08 |