본문 바로가기

STUDY/Spring

Spring Boot | Kafka를 이용한 채팅 (2) Kafka 연동 설정

 

프로젝트  구조

1. Kafka관련 설정하기

1-0. constant작성

✔️KAFKA_TOPIC - 생성한 토픽의 이름. 아래에서 토픽을 생성할 것임 (동일한 이름으로 생성하기)

✔️GROUP_ID - consumer를 식별할 수 있는 그룹

✔️KAFKA_BROKER - Kafka 클러스터에 연결하기 위한 호스트:포트 값

public class KafkaConstants {
    public static final String KAFKA_TOPIC = "kafka-chat";
    public static final String GROUP_ID = "foo";
    public static final String KAFKA_BROKER = "localhost:9092";
}

토픽 생성

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic [토픽 이름]

 

토픽 목록 조회

kafka-topics --list --zookeeper localhost:2181

1-1. ProducerConfiguration

✔️producer는 TOPIC에 메시지를 작성

✔️KafkaTemplate을 통해 TOPIC에 메시지를 보낼 수 있음

✔️BOOTSTRAP_SERVERS_CONFIG는 Kafka가 실행되는 주소를 설정

✔️KEY_SERIALIZER_CLASS_CONFIG와 VALUE_SERIALIZER_CLASS_CONFIG는 Kafka로 보내는 데이터의 키와 값을 직렬화함

✔️문자열을 넘길땐 StringSerializer.class를, JSON 데이터를 넘길 땐 JsonSerializer.class를 적어주면 됨

✔️properties나 yaml으로 설정할 수도 있고, 아래처럼 @Bean으로 설정해줄 수도 있음

@EnableKafka
@Configuration
public class ProducerConfiguration {

    @Bean
    public ProducerFactory<String, Message> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigurations());
    }

    @Bean
    public Map<String, Object> producerConfigurations() {
        Map<String, Object> configurations = new HashMap<>();
        configurations.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER);
        configurations.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configurations.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return configurations;
    }

    @Bean
    public KafkaTemplate<String, Message> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

1-2. ListenerConfiguration

✔️listener(consumer)는 Kafka로부터 메시지를 받는 곳

✔️GROUP_ID_CONFIG는 consumer group id를 설정

✔️KEY_DESERIALIZER_CLASS_CONFIG와 VALUE_DESERIALIZER_CLASS_CONFIG는 Kafka에서 받은 데이터의 키와 값을 역직렬화함

✔️AUTO_OFFSET_RESET_CONFIG에는 latest(가장 최근에 생성된 메시지를 offset reset), earliest(가장 오래된 메시지를), none의 값을 입력할 수 있음

@EnableKafka
@Configuration
public class ListenerConfiguration {

    @Bean
    ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, Message> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigurations(), new StringDeserializer(), new JsonDeserializer<>(Message.class));
    }

    @Bean
    public Map<String, Object> consumerConfigurations() {
        Map<String, Object> configurations = new HashMap<>();
        configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER);
        configurations.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID);
        configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        configurations.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return configurations;
    }
}

1-3. MessageListener

✔️@KafkaListener 어노테이션을 통해 Kafka로부터 메시지를 받을 수 있음

✔️template.convertAndSend를 통해 WebSocket으로 메시지를 전송

✔️Message를 작성할 때 경로 잘 보고 import하시길... 

@Slf4j
@Component
public class MessageListener {

    @Autowired
    SimpMessagingTemplate template;

    @KafkaListener(
            topics = KafkaConstants.KAFKA_TOPIC,
            groupId = KafkaConstants.GROUP_ID
    )
    public void listen(Message message) {
        log.info("sending via kafka listener..");
        template.convertAndSend("/topic/group", message);
    }
}

1-4. WebSocketConfig

✔️endpoint에 작성한 값으로 client측에서 연결 (추후 React를 이용한 프론트단 작업 예정)

✔️setAllowedOrigins는 허용할 주소를 작성하면 되는데, "*"값은 모두 허용한다는 뜻

✔️applicationDestinationPrefixes는 메시지를 전송할 때 사용하는 url

@EnableWebSocketMessageBroker
@Configuration
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/my-chat").setAllowedOrigins("*").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/kafka");
        registry.enableSimpleBroker("/topic/");
    }
}