Kafka Zookeeper 설치
1. Docker-Compose 설치
https://docs.docker.com/compose/install/
2. kafka-docker 설치
$ git clone https://github.com/wurstmeister/kafka-docker
3. 설정 수정
$ vi docker-compose.yml
KAFKA_ADVERTISED_HOST_NAME :
{본인ip}
4. 도커실행
$ docker-compose up -d
5. 들어가서 확인해보기
$ docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e391cac38357 kafkadocker_kafka "start-kafka.sh" 27 hours ago Up 27 hours 0.0.0.0:32769->9092/tcp kafkadocker_kafka_1
$ docker exec -it e391cac38357 /bin/bash
// 컨테이너 진입 - Topic 목록 보기
$ bash-4.3# kafka-topics.sh --list --zookeeper zookeeper
Spring Boot - Kafka 연계
1. 토픽만들기
bash-4.3# kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mytopic
2. Spring Boot Configuration 설정
buildscript {
ext {
springBootVersion = '1.5.9.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
group = 'com.tistory.eclipse4j'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile('org.springframework.kafka:spring-kafka')
compile('org.springframework.boot:spring-boot-starter-thymeleaf')
compile('org.springframework.boot:spring-boot-starter-web')
compileOnly('org.projectlombok:lombok')
testCompile('org.springframework.boot:spring-boot-starter-test')
}
@EnableKafka
@Configuration
public class KafkaConfiguration {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ProducerFactory<String, MyTopic> topicProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, MyTopic> topicKafkaTemplate() {
return new KafkaTemplate<>(topicProducerFactory());
}
public ConsumerFactory<String, MyTopic> topicListenerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mytopic");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(MyTopic.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyTopic> topicKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyTopic> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(topicListenerFactory());
return factory;
}
}
3. Sender 만들기
@Component
public class MyTopicKafkaSender {
@Autowired
private KafkaTemplate<String, MyTopic> topicKafkaTemplate;
public void send(String topic, MyTopic payload) {
log.info("Send : Key={}, Topic ==================== > {}", topic, payload);
topicKafkaTemplate.send(topic, payload);
}
}
4. Listener 만들기
@Slf4j
@Component
public class MyTopicKafkaListener {
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "mytopic", containerFactory = "topicKafkaListenerFactory")
public void reciver(MyTopic topic) {
log.info("MyTopic => {}", topic);
}
}
5. Test 돌려보기
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaSenderSpringTest {
@Autowired
private MyTopicKafkaSender sender;
@Autowired
private MyTopicKafkaListener listener;
@Test
public void testSend() throws Exception {
MyTopic myTopic = new MyTopic();
myTopic.setKey("TopicKey");
myTopic.setData("Data Topic");
sender.send("mytopic", myTopic);
listener.getLatch().await(10000, TimeUnit.MILLISECONDS);
}
}
6. 결과
2017-12-05 00:09:34.810 INFO 8684 --- [ntainer#0-0-C-1] c.t.e.service.MyTopicKafkaListener : MyTopic => MyTopic(key=TopicKey, data=Data Topic)
Git : https://github.com/eclipse4j/spring-boot-kafka