1. Kafka관련 설정하기
1-0. constant작성
✔️KAFKA_TOPIC - 생성한 토픽의 이름. 아래에서 토픽을 생성할 것임 (동일한 이름으로 생성하기)
✔️GROUP_ID - consumer를 식별할 수 있는 그룹
✔️KAFKA_BROKER - Kafka 클러스터에 연결하기 위한 호스트:포트 값
public class KafkaConstants {
public static final String KAFKA_TOPIC = "kafka-chat";
public static final String GROUP_ID = "foo";
public static final String KAFKA_BROKER = "localhost:9092";
}
토픽 생성
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic [토픽 이름]
토픽 목록 조회
kafka-topics --list --zookeeper localhost:2181
1-1. ProducerConfiguration
✔️producer는 TOPIC에 메시지를 작성
✔️KafkaTemplate을 통해 TOPIC에 메시지를 보낼 수 있음
✔️BOOTSTRAP_SERVERS_CONFIG는 Kafka가 실행되는 주소를 설정
✔️KEY_SERIALIZER_CLASS_CONFIG와 VALUE_SERIALIZER_CLASS_CONFIG는 Kafka로 보내는 데이터의 키와 값을 직렬화함
✔️문자열을 넘길땐 StringSerializer.class를, JSON 데이터를 넘길 땐 JsonSerializer.class를 적어주면 됨
✔️properties나 yaml으로 설정할 수도 있고, 아래처럼 @Bean으로 설정해줄 수도 있음
@EnableKafka
@Configuration
public class ProducerConfiguration {
@Bean
public ProducerFactory<String, Message> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigurations());
}
@Bean
public Map<String, Object> producerConfigurations() {
Map<String, Object> configurations = new HashMap<>();
configurations.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER);
configurations.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configurations.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return configurations;
}
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
1-2. ListenerConfiguration
✔️listener(consumer)는 Kafka로부터 메시지를 받는 곳
✔️GROUP_ID_CONFIG는 consumer group id를 설정
✔️KEY_DESERIALIZER_CLASS_CONFIG와 VALUE_DESERIALIZER_CLASS_CONFIG는 Kafka에서 받은 데이터의 키와 값을 역직렬화함
✔️AUTO_OFFSET_RESET_CONFIG에는 latest(가장 최근에 생성된 메시지를 offset reset), earliest(가장 오래된 메시지를), none의 값을 입력할 수 있음
@EnableKafka
@Configuration
public class ListenerConfiguration {
@Bean
ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, Message> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigurations(), new StringDeserializer(), new JsonDeserializer<>(Message.class));
}
@Bean
public Map<String, Object> consumerConfigurations() {
Map<String, Object> configurations = new HashMap<>();
configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER);
configurations.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID);
configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configurations.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return configurations;
}
}
1-3. MessageListener
✔️@KafkaListener 어노테이션을 통해 Kafka로부터 메시지를 받을 수 있음
✔️template.convertAndSend를 통해 WebSocket으로 메시지를 전송
✔️Message를 작성할 때 경로 잘 보고 import하시길...
@Slf4j
@Component
public class MessageListener {
@Autowired
SimpMessagingTemplate template;
@KafkaListener(
topics = KafkaConstants.KAFKA_TOPIC,
groupId = KafkaConstants.GROUP_ID
)
public void listen(Message message) {
log.info("sending via kafka listener..");
template.convertAndSend("/topic/group", message);
}
}
1-4. WebSocketConfig
✔️endpoint에 작성한 값으로 client측에서 연결 (추후 React를 이용한 프론트단 작업 예정)
✔️setAllowedOrigins는 허용할 주소를 작성하면 되는데, "*"값은 모두 허용한다는 뜻
✔️applicationDestinationPrefixes는 메시지를 전송할 때 사용하는 url
@EnableWebSocketMessageBroker
@Configuration
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/my-chat").setAllowedOrigins("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/kafka");
registry.enableSimpleBroker("/topic/");
}
}
'STUDY > Spring' 카테고리의 다른 글
Spring Boot | S3 Pre-Signed URL 생성 (0) | 2021.01.28 |
---|---|
Spring Boot | Kafka를 이용한 채팅 (3) 메시지 주고받기 + ReactJS (9) | 2021.01.13 |
Spring Boot | Kafka를 이용한 채팅 (1) Kafka설치 및 프로젝트 생성 (1) | 2021.01.12 |
Spring Boot | S3파일 업로드 후 CloudFront SignedURL 생성하기 (0) | 2020.12.24 |
Spring Boot | S3 파일 업로드 (0) | 2020.12.22 |