본문으로 바로가기

[Kafka] 기본 카프카 명령어 (command-line tool)

category Backend/Kafka 2022. 1. 13. 13:10

0. 환경

  • CentOS7
  • JDK 11(Java 11)
  • Apache Kafka 3.0.0

1. kafka-topics.sh

  • Topic과 관련된 명령을 실행할 수 있는 쉘 스크립트 파일입니다.
  • Topic이란 RDBMS에서 사용하는 테이블과 비슷하다고 볼 수 있습니다.

 

토픽 생성

  • --create 옵션을 사용하여 Topic을 생성합니다. 
  • --bootstrap-server는 토픽을 생성할 카프카 클러스터를 구성하는 브로커들의 IP와 port를 작성합니다.
  • --replication-factor는 Topic의 파티션을 복제할 복제 개수를 작성합니다.(2이면 1개의 복제본을 사용한다는 의미)
  • --partitions는 파티션의 개수를 지정합니다.
  • --config는 추가적인 설정을 할 수 있습니다.(retention.ms는 데이터 유지기간(ms) 86400000ms = 1일
  • replication 1개, partition 1개로 kafka-test라는 토픽을 생성합니다.
bin/kafka-topics.sh \
--create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--config retention.ms=86400000 \
--topic kafka-test

 

토픽 리스트 조회

  • --list 옵션을 사용하여 Topic 리스트를 조회할 수 있습니다.
  • --exclude-internal는 Kafka 내부 관리를 위한 Topic을 조회 대상에서 제외 합니다.
bin/kafka-topics.sh \
--list \
--bootstrap-server localhost:9092 \
--exclude-internal

 

토픽 상세 조회

  • --describe 옵션을 사용하여 토픽의 상세내역을 확인할 수 있습니다.
  • 파티션 개수, 복제된 파티션이 위치한 브로커의 번호, 기타 토픽을 구성하는 설정들을 출력합니다.
  • 토픽 별 토픽명, 존재하는 파티션 번호, 리더 브로커 번호, 복제 계수, ISR
  • 토픽의 리더 파티션이 일부 브로커에 몰려있는 경우 카프카 클러스터 부하가 특정 브로커들로만 몰릴 수 있으므로 성능 확인을 위해 상세 내역을 조회해 봅니다.
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--describe

Topic: kafka-test	TopicId: dIgRmdtJQpqv5y3s6cJE6w	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: kafka-test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: kafka-test	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: kafka-test	Partition: 2	Leader: 0	Replicas: 0	Isr: 0

 

토픽 옵션 수정

  • 파티션 개수를 변경하려면 kafka-topics.sh를 사용해야 합니다.
  • 토픽 옵션 중 Dynamic topic config(log.segment.bytes, log.retention.ms 등)는 kafka-configs.sh를 통해 수정할 수 있습니다.
  • 주의할 점은 토픽의 파티션 수는 증가만 가능하고, 감소는 불가능합니다. (파티션의 수만큼 컨슈머 역시 추가해줘야 합니다.)
[partition 개수를 3개로 늘리기]
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--alter \
--partitions 3

[설정 값 조회]
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--describe

[retention.ms를 1일로 변경하기]
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name kafka-test \
--alter \
--add-config retention.ms=86400000

[retention.ms 옵션 삭제]
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name kafka-test \
--alter \
--delete-config retention.ms

[설정 값 조회]
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name kafka-test \
--describe

 

토픽 삭제

bin/kafka-topics.sh \
--delete \
--bootstrap-server localhost:9092 \
--topic kafka-test

 

★ Topic 삭제가 안될 경우

$ zookeeper-shell localhost:2181 
ls /brokers/topics 
토픽이름 토픽2 토픽3
deleteall /brokers/topics/test
ls /brokers/topics

2. kafka-console-producer.sh

  • 생성된 토픽에 데이터를 넣을 수 있는 console producer 실행 명령어입니다.
  • 토픽에 넣는 데이터는 레코드(record)라고 부르며 키(key)와 값(value)으로 이루어져 있습니다. 
  • 테스트 목적으로 String 타입으로만 브로커로 전송할 수 있습니다.
  • 전송되는 레코드 값은 UTF-8을 기반으로 Byte로 변환되고 ByteArraySerializer로만 직렬화됩니다.
  • Producer client 라이브러리를 이용해 직접 개발하여 튜닝할 수도 있습니다.
[value 형태의 데이터를 입력(key는 null로 입력됩니다.)]
bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic kafka-test

[key, value 형태의 데이터를 입력]
bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--property "parse.key=true" \
--property "key.separator=:"

3. kafka-console-consumer.sh

  • 브로커에 전송된 데이터를 폴링(polling)하여 데이터를 확인할 수 있는 console consumer 명령어입니다.
  • --from-beginning 옵션을 사용하면 가장 처음 offset 데이터부터 폴링(polling)합니다
[value 형태로 데이터를 받음]
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--from-beginning

[key-value 형태로 데이터를 받음]
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--property print.key=true \
--property key.separator="-" \
--from-beginning

[group을 생성하여 데이터를 받음 따라서 파티션 개수에 따라 데이터 순서가 보장되지 않을수도 있습니다.]
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--group testgroup \
--from-beginning

[토픽의 특정 파티션 폴링]
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--from-beginning \
--partition 0 \
--topic kafka-test

4. kafka-consumer-groups.sh

  • 컨슈머 그룹은 따로 생성하는 명령어 필요 없이 컨슈머 동작 시 컨슈머 그룹명을 지정하면 새로 생성됩니다. 
  • 그룹 정보는 kafka-consumer-groups.sh 명령어를 이용해 확인할 수 있습니다.

 

컨슈머 그룹 리스트 조회

  • --list 옵션을 사용하여 그룹의 리스트를 확인 할 수 있습니다. 
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list

 

그룹 상세 내용 조회

  • --decribe 옵션을 사용해 특정 그룹에 대한 상세 내용을 조회합니다.
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group testgroup \
--describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
testgroup       kafka-test      1          259             259             0               -               -               -
testgroup       kafka-test      0          270             270             0               -               -               -
testgroup       kafka-test      2          301             301             0               -               -               -

 

그룹 삭제

bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--delete \
-group testgroup

 

토픽 오프셋 리셋

--to-earliest (가장 낮은 숫자의 오프셋)

파티션 번호와 특정 오프셋을 지정하여 리셋도 가능합니다.

bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group testgroup \
--topic kafka-test \
--reset-offsets \
--to-earliest \
--execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
testgroup                      kafka-test                     0          0
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group testgroup \
--topic kafka-test:0 \
--reset-offsets \
--to-offset 9 \
--execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
testgroup                      kafka-test                     0          9

5. kafka-verifiable-producer

  • 네트워크 통신 테스트할 때 주로 사용하는 명령어입니다.(Producer)
  • --max-messages 옵션을 사용해 보낼 데이터 개수를 지정할 수 있습니다. (-1을 사용하면 종료될 때까지 계속 데이터를 보냅니다.)
  • 마지막 결과 줄을 보면 통계 값이 출력되는데 평균 처리량을 확인할 수 있습니다. 
bin/kafka-verifiable-producer.sh \
--bootstrap-server localhost:9092 \
--max-messages 100 \
--topic kafka-test

{"timestamp":1641996958595,"name":"startup_complete"}
{"timestamp":1641996958864,"name":"producer_send_success","key":null,"value":"0","offset":359,"topic":"kafka-test","partition":1}
. . . (생략)
{"timestamp":1641996958887,"name":"shutdown_complete"}
{"timestamp":1641996958888,"name":"tool_data","sent":100,"acked":100,"target_throughput":-1,"avg_throughput":323.62459546925567}

6. kafka-verifiable-consumer

  • 네트워크 통신 테스트할 때 주로 사용하는 명령어입니다.(Consumer)
  • 결괏값을 보면 verifiable Producer가 보낸 데이터를 정상적으로 받았는지 그리고 offset 커밋 여부를 확인할 수 있습니다.
bin/kafka-verifiable-consumer.sh \
--bootstrap-server localhost:9092 \
--topic kafka-test \
--group-id testgroup

{"timestamp":1641996994729,"name":"startup_complete"}
{"timestamp":1641996995077,"name":"partitions_assigned","partitions":[{"topic":"kafka-test","partition":1},{"topic":"kafka-test","partition":2},{"topic":"kafka-test","partition":0}]}
{"timestamp":1641996995133,"name":"records_consumed","count":100,"partitions":[{"topic":"kafka-test","partition":1,"count":100,"minOffset":359,"maxOffset":458}]}
{"timestamp":1641996995139,"name":"offsets_committed","offsets":[{"topic":"kafka-test","partition":1,"offset":459}],"success":true}

7. kafka-delete-records.sh

  • 토픽에 저장된 데이터를 지울 수 있는 쉘 스크립트입니다.
  • 가장 오래된 데이터부터 특정 시점의 오프셋 데이터까지 삭제할 수 있습니다.
  • json 파일을 생성 후 명령어를 이용해 삭제 할 수 있습니다.
  • "topic": "[삭제할 레코드가 있는 토픽명]", "partition": [삭제할 레코드가 있는 파티션 번호], "offset": [처음부터 삭제할 offset 번호]
  • "offset": 4 -> offset 0 ~ 3 삭제
[1. json 파일을 생성 및 작성합니다.]
vi delete-topic.json

{
  "partitions": [
    {
      "topic": "kafka-test",
      "partition": 0,
      "offset": 4
    }
  ],
  "version": 1
}

[2. 명령어를 사용해서 레코드를 삭제합니다.]
bin/kafka-delete-records.sh \
--bootstrap-server my-kafka:9092 \
--offset-json-file delete-topic.json