본문으로 바로가기

[Spring for Apache Kafka] Apache Kafka Producer

category Backend/Kafka 2022. 1. 9. 22:16

0. 환경

  • Spring Boot 2.6.2 (Gradle)
  • JDK 11(Java 11)
  • IntelliJ
  • Postman
  • Apache Kafka 3.0.0

[build.gradle dependencies]

dependencies {
   implementation 'org.springframework.boot:spring-boot-starter-web'
   implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:2.2.1'
   implementation 'org.springframework.kafka:spring-kafka'
   compileOnly 'org.projectlombok:lombok'
   annotationProcessor 'org.projectlombok:lombok'
   testImplementation 'org.springframework.boot:spring-boot-starter-test'
   testImplementation 'org.springframework.kafka:spring-kafka-test'
}

1. Spring Boot Project 생성

1. start.spring.io 사이트에서 다음과 같이 프로젝트 Dependencies를 추가하여 생성합니다.

2. 그 후 압축을 풀어 인텔리제이 프로젝트로 열어줍니다.

(처음에는 Dependencies를 다운로드하느라 시간이 소요됩니다.)

2. 설정 파일

Kafka Broker와의 연동을 위해 application.propertie 파일을 수정합니다.

#Kafka Producer
spring.kafka.producer.bootstrap-servers=yourKafka:port
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=1

3. Producer 서비스 생성

KafkaTemplate 클래스를 활용해 Kafka Broker로 메시지를 전달(저장)합니다.

KafkaTemplate 자세히 살펴보는 것을 추천합니다.

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Log4j2
@Service
@RequiredArgsConstructor
public class TestProducer {

    @Value("${test.topic}")
    private String TOPIC;

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        log.info(String.format("Produce message(test) : %s", message));
        this.kafkaTemplate.send(TOPIC, message);
    }
}

4. 컨트롤러 생성

컨트롤러로 요청을 받아 Producer 서비스를 활용해 Kafka Broker로 데이터를 전달합니다.

import kamp.ai.producer.controller.kafka.producer.TestProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
public class KafkaController {

    private final TestProducer testProducer;

    @PostMapping(value = "/test")
    public void TestSendMessage(@RequestBody(required = true) String message) {
        this.testProducer.sendMessage(message);
    }
}

5. 테스트

1. Kafka 실행

카프카 실행경로인 bin 디렉터리로 이동해 다음 명령어로 카프카를 실행해줍니다.

1. 주키퍼 실행
# ./zookeeper-server-start.sh -daemon /usr/local/src/kafka_2.13-3.0.0/config/zookeeper.properties

2. 카프카 실행
# ./kafka-server-start.sh -daemon /usr/local/src/kafka_2.13-3.0.0/config/server.properties

 

 

2. Topic 생성

partitions이 1개이고 replication이 1개인 test Topic을 생성합니다.

3. Topic 생성
# ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

4. Topic List 확인
./kafka-topics.sh --list --bootstrap-server localhost:9092

 

3. Console Consumer 실행

위에서 작성한 Spring Boot Producer가 데이터를 Kafka Broker로 잘 전달하는지 확인을 위해 데이터를 Polling 작업을 할 수 있는 console consumer 실행합니다.

5. console consumer
# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

 

4. Spring Boot 실행

Spring Boot Producer Project를 인텔리제이를 이용해 실행합니다.

 

5. Post Man을 이용해 해당 컨트롤러로 url경로로 데이터를 보내봅니다.

Console Consumer에서 Kafka Broker의 데이터를 폴링 하고 있는 것을 확인할 수 있습니다.