본문으로 바로가기

[Spring for Apache Kafka] Consumer Mysql Insert

category Backend/Kafka 2022. 1. 11. 21:39

0. 환경

  • Spring Boot 2.6.2 (Gradle)
  • JDK 11(Java 11)
  • IntelliJ
  • Postman
  • Apache Kafka 3.0.0

[build.gradle dependencies]

dependencies {
   implementation 'org.springframework.boot:spring-boot-starter'
   implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:2.2.1'
   implementation 'org.springframework.kafka:spring-kafka'
   compileOnly 'org.projectlombok:lombok'
   runtimeOnly 'mysql:mysql-connector-java'
   annotationProcessor 'org.projectlombok:lombok'
   testImplementation 'org.springframework.boot:spring-boot-starter-test'
   testImplementation 'org.springframework.kafka:spring-kafka-test'
}

1. Spring Boot Project 생성

1. start.spring.io 사이트에서 다음과 같이 프로젝트 Dependencies를 추가하여 생성합니다.

2. 그 후 압축을 풀어 인텔리제이 프로젝트로 열어줍니다.

(처음에는 Dependencies를 다운로드하느라 시간이 소요됩니다.)

2. 설정 파일

Kafka Broker와의 연동, Mysql 연동을 위해 application.propertie 파일을 수정합니다.

#Consumer
spring.kafka.consumer.bootstrap-servers=카프카ip:카프카port
spring.kafka.consumer.group-id=그룹ID
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#MYSQL DB
spring.datasource.hikari.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.hikari.jdbc-url=jdbc:mysql://DB주소:포트/kamp_ai?useSSL=false&amp&serverTimezone=UTC
spring.datasource.hikari.username=계정
spring.datasource.hikari.password=비밀번호

3. MyBatis 설정 클래스

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;

import javax.sql.DataSource;

@Configuration
@PropertySource("classpath:/application.properties")
public class DBConfiguration {

    @Autowired
    private ApplicationContext applicationContext;

    @Bean
    @ConfigurationProperties(prefix="spring.datasource.hikari")
    public HikariConfig hikariConfig() {
        return new HikariConfig();
    }

    @Bean
    public DataSource dataSource() throws Exception {
        DataSource dataSource = new HikariDataSource(hikariConfig());
        return dataSource;
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource);
        sqlSessionFactoryBean.setMapperLocations(applicationContext.getResources("classpath:/mapper/*.xml")); // mapper 파일 경로
        sqlSessionFactoryBean.setTypeAliasesPackage("consumer.domain"); // DTO, DOMAIN 경로
        return sqlSessionFactoryBean.getObject();
    }

    @Bean
    public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

4. Domain 생성

import lombok.Data;

@Data
public class Test {
    private String text;
}

5. Mapper.xml 생성

<mapper namespace="consumer.mapper.TestMapper"> mapper 인터페이스 경로와 잘 맞춰줍니다.

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="consumer.mapper.TestMapper">
    <insert id="testInsert" parameterType="Test">
        INSERT INTO test
        VALUES (#{text})
    </insert>
</mapper>

6. Mapper.interface 생성

import consumer.domain.Test;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface TestMapper {

    // 토큰 권한 조회
    void testInsert(Test test) throws Exception;
}

7. Kafka Consumer 생성

@KafkaListener를 활용해 쉽게 컨슈머를 생성할 수 있습니다.

import consumer.domain.Test;
import consumer.mapper.TestMapper;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
@Log4j2
public class KafkaConsumer {

    @Autowired
    TestMapper testMapper;

    @KafkaListener(topics = "kafka-test", groupId = "testgroup2")
    public void consume(String message) throws Exception {
        Test test = new Test();
        test.setText(message);
        testMapper.testInsert(test);
    }
}

8. Kafka Topic 생성

다음 명령어를 이용해 Topic을 생성합니다.

# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic kafka-test

9. Mysql Table 생성

CREATE TABLE `test` (
	`text` VARCHAR(50) NULL DEFAULT NULL COLLATE 'utf8_general_ci'
)
COLLATE='utf8_general_ci'
ENGINE=InnoDB
;

10. Kafka Console Producer를 활용한 테스트

다음 명령어를 이용해 Console Producer를 실행합니다. 

# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-test

11. 테스트

스프링 부트를 실행 후  Console Producer를 이용해 데이터를 입력합니다.

Console Producer
Mysql 결과