0. 환경
- Spring Boot 2.6.2 (Gradle)
- JDK 11(Java 11)
- IntelliJ
- Postman
- Apache Kafka 3.0.0
[build.gradle dependencies]
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:2.2.1'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
runtimeOnly 'mysql:mysql-connector-java'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
1. Spring Boot Project 생성
1. start.spring.io 사이트에서 다음과 같이 프로젝트 Dependencies를 추가하여 생성합니다.
2. 그 후 압축을 풀어 인텔리제이 프로젝트로 열어줍니다.
(처음에는 Dependencies를 다운로드하느라 시간이 소요됩니다.)
2. 설정 파일
Kafka Broker와의 연동, Mysql 연동을 위해 application.propertie 파일을 수정합니다.
#Consumer
spring.kafka.consumer.bootstrap-servers=카프카ip:카프카port
spring.kafka.consumer.group-id=그룹ID
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#MYSQL DB
spring.datasource.hikari.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.hikari.jdbc-url=jdbc:mysql://DB주소:포트/kamp_ai?useSSL=false&&serverTimezone=UTC
spring.datasource.hikari.username=계정
spring.datasource.hikari.password=비밀번호
3. MyBatis 설정 클래스
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import javax.sql.DataSource;
@Configuration
@PropertySource("classpath:/application.properties")
public class DBConfiguration {
@Autowired
private ApplicationContext applicationContext;
@Bean
@ConfigurationProperties(prefix="spring.datasource.hikari")
public HikariConfig hikariConfig() {
return new HikariConfig();
}
@Bean
public DataSource dataSource() throws Exception {
DataSource dataSource = new HikariDataSource(hikariConfig());
return dataSource;
}
@Bean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSource);
sqlSessionFactoryBean.setMapperLocations(applicationContext.getResources("classpath:/mapper/*.xml")); // mapper 파일 경로
sqlSessionFactoryBean.setTypeAliasesPackage("consumer.domain"); // DTO, DOMAIN 경로
return sqlSessionFactoryBean.getObject();
}
@Bean
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
4. Domain 생성
import lombok.Data;
@Data
public class Test {
private String text;
}
5. Mapper.xml 생성
<mapper namespace="consumer.mapper.TestMapper"> mapper 인터페이스 경로와 잘 맞춰줍니다.
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="consumer.mapper.TestMapper">
<insert id="testInsert" parameterType="Test">
INSERT INTO test
VALUES (#{text})
</insert>
</mapper>
6. Mapper.interface 생성
import consumer.domain.Test;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface TestMapper {
// 토큰 권한 조회
void testInsert(Test test) throws Exception;
}
7. Kafka Consumer 생성
@KafkaListener를 활용해 쉽게 컨슈머를 생성할 수 있습니다.
import consumer.domain.Test;
import consumer.mapper.TestMapper;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
@Log4j2
public class KafkaConsumer {
@Autowired
TestMapper testMapper;
@KafkaListener(topics = "kafka-test", groupId = "testgroup2")
public void consume(String message) throws Exception {
Test test = new Test();
test.setText(message);
testMapper.testInsert(test);
}
}
8. Kafka Topic 생성
다음 명령어를 이용해 Topic을 생성합니다.
# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic kafka-test
9. Mysql Table 생성
CREATE TABLE `test` (
`text` VARCHAR(50) NULL DEFAULT NULL COLLATE 'utf8_general_ci'
)
COLLATE='utf8_general_ci'
ENGINE=InnoDB
;
10. Kafka Console Producer를 활용한 테스트
다음 명령어를 이용해 Console Producer를 실행합니다.
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-test
11. 테스트
스프링 부트를 실행 후 Console Producer를 이용해 데이터를 입력합니다.
'Backend > Kafka' 카테고리의 다른 글
[Kafka] 카프카 Burrow 설치 (카프카 모니터링) (0) | 2022.01.29 |
---|---|
[Kafka] 기본 카프카 명령어 (command-line tool) (0) | 2022.01.13 |
[Spring for Apache Kafka] Error while fetching metadata with correlation id (0) | 2022.01.10 |
[Spring for Apache Kafka] Apache Kafka Producer (0) | 2022.01.09 |
[Kafka] Linux Kafka 설치 (Apache Kafka 3.0.0) (0) | 2022.01.08 |