본문 바로가기

Programming!

Spring Boot - Kafka 연계

Kafka Zookeeper 설치


1. Docker-Compose 설치

https://docs.docker.com/compose/install/


2. kafka-docker 설치

$ git clone https://github.com/wurstmeister/kafka-docker


3. 설정 수정

$ vi docker-compose.yml

KAFKA_ADVERTISED_HOST_NAME : {본인ip}



4. 도커실행

$ docker-compose up -d


5. 들어가서 확인해보기

$ docker ps -a

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
e391cac38357        kafkadocker_kafka        "start-kafka.sh"         27 hours ago        Up 27 hours         0.0.0.0:32769->9092/tcp                              kafkadocker_kafka_1


$ docker exec -it e391cac38357 /bin/bash


// 컨테이너 진입 - Topic 목록 보기

$ bash-4.3# kafka-topics.sh --list --zookeeper zookeeper



Spring Boot - Kafka 연계


1. 토픽만들기

bash-4.3# kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mytopic



2. Spring Boot Configuration 설정

buildscript {
    ext {
        springBootVersion = '1.5.9.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'

group = 'com.tistory.eclipse4j'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}


dependencies {
    compile('org.springframework.kafka:spring-kafka')
    compile('org.springframework.boot:spring-boot-starter-thymeleaf')
    compile('org.springframework.boot:spring-boot-starter-web')
    compileOnly('org.projectlombok:lombok')
    testCompile('org.springframework.boot:spring-boot-starter-test')
}


@EnableKafka
@Configuration
public class KafkaConfiguration {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    public ProducerFactory<String, MyTopic> topicProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, MyTopic> topicKafkaTemplate() {
        return new KafkaTemplate<>(topicProducerFactory());
    }

    public ConsumerFactory<String, MyTopic> topicListenerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
                new JsonDeserializer<>(MyTopic.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MyTopic> topicKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MyTopic> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(topicListenerFactory());
        return factory;
    }
}


3. Sender 만들기

@Component
public class MyTopicKafkaSender {

    @Autowired
    private KafkaTemplate<String, MyTopic> topicKafkaTemplate;

    public void send(String topic, MyTopic payload) {
        log.info("Send : Key={}, Topic ==================== > {}", topic, payload);
        topicKafkaTemplate.send(topic, payload);
    }
}


4. Listener 만들기

@Slf4j
@Component
public class MyTopicKafkaListener {

    private CountDownLatch latch = new CountDownLatch(1);

    public CountDownLatch getLatch() {
        return latch;
    }

    @KafkaListener(topics = "mytopic", containerFactory = "topicKafkaListenerFactory")
    public void reciver(MyTopic topic) {
        log.info("MyTopic => {}", topic);
    }
}


5. Test 돌려보기

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaSenderSpringTest {

    @Autowired
    private MyTopicKafkaSender sender;
    @Autowired
    private MyTopicKafkaListener listener;
   
    @Test
    public void testSend() throws Exception {
        MyTopic myTopic = new MyTopic();
        myTopic.setKey("TopicKey");
        myTopic.setData("Data Topic");
        sender.send("mytopic", myTopic);
        listener.getLatch().await(10000, TimeUnit.MILLISECONDS);
    }

}


6. 결과

2017-12-05 00:09:34.810  INFO 8684 --- [ntainer#0-0-C-1] c.t.e.service.MyTopicKafkaListener       : MyTopic => MyTopic(key=TopicKey, data=Data Topic)


Git : https://github.com/eclipse4j/spring-boot-kafka