본문 바로가기

Programming!

MQTT를 발행/수신 해보자.

작은 쇼핑몰을 만들어 보려고 하면서 뭔가 시나리오를 크게 잡고 하기보다는( 이렇게 하면 만들다 지쳐버린 경험도 있고 해서..) 작은 단위의 시나리오로 만들려고 한다.

기존 JPA관련 간단팁으로 정리하던 repository를 복사해서 시작하기로 맘을 먹고 준비땅을 외침.

멀티 데이터베이스, 멀티 모듈등 기본 인프라사항을 적용해놓고, 전시카테고리를 구현하려고 하던중 급 MQTT에 대한 처리를 하고 싶어졌다.

기본 단위의 시나리오는 이렇다.

'사용자가 주문을 한다 -> 주문 프로세스가 완료된다 -> 완료 정보는 사용자에게 돌아간다. -> ASYNC로 주문완료 토픽을 발행한다. -> 구독 디바이스에서 해당 정보를 확인한다.' 

간단하다. 아주~

대충 토픽을 가져가는 서비스들은 POS라던지 EMAIL 이라던지...이라던지..등등이 되지 싶다.

 

그래서 AWS의 서비스를 이용하기전 로컬환경에서 위의 환경을 구축해보기로 하고, 이리저리 정보를 찾아 다녀보니 그냥 eclipse-mosquitto의 도커 컨테이너를 올리면 될 듯 하더라.

소스에 있는 docker-compose.yml 파일에 추가를 했다.

  mqtt:
    image: eclipse-mosquitto:1.6.4
    restart: always
    ports:
      - "1883:1883"
      - "9001:9001"
    volumes:
      - /workspaces/docker-workspaces/mqtt/config:/mosquitto/config
      - /workspaces/docker-workspaces/mqtt/data:/mosquitto/data
      - /workspaces/docker-workspaces/mqtt/log:/mosquitto/log

관련 env파일과 conf파일을 기존 개발환경에 맞추고 실행. 현재 나의 도커 환경이다. 아..메모리..( 10개월만 있으면 32G로 받는다 아싸!!! )

docker-compose로 network를 서로 바인드 하지 않았다면  아이피를 기억해두자.

콘솔에서 '$> docker inspect 컨테이너아이디' 로 확인 할 수 있다.

추가로 MAC OS라면 https://mqttfx.jensd.de/ 이놈을 받아 두면 좋다.

다시 본작업으로 돌아와서. 이제 Spring boot에 알맞은 Configuration 코드를 만들어 올리자.

MqttConfiguration 으로 설정을 파일을 지정하자. 'org.eclipse.paho.client.mqttv3'를 사용하지 않고 'spring-integration-xxxx'를 사용할 것이므로 의존라이브러리도 추가해준다.

build.gradle

	compile("org.springframework.integration:spring-integration-mqtt")
	compile("org.springframework.integration:spring-integration-stream")

MqttConfiguration.java

....
	@ServiceActivator(inputChannel = "mqttOrderOutboundChannel")
	public MessageHandler mqttOrderMessageHandler() {
		MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
		messageHandler.setAsync(true);
		messageHandler.setDefaultTopic(defaultTopic);
		return messageHandler;
	}

	@Bean
	public MessageChannel mqttOrderOutboundChannel() {
		return new DirectChannel();
	}
....

이때 @MessagingGateway 를 사용할 것이므로 Configuration에 @IntegrationComponentScan 를 꼭 추가해준다. (https://docs.spring.io/spring-integration/api/org/springframework/integration/annotation/MessagingGateway.html)

@MessagingGateway의 인터페이스를 작성한다.

@MessagingGateway(defaultRequestChannel = "mqttOrderOutboundChannel")
public interface MqttOrderGateway {
	void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}

음.. 명시적으로 Order라는 중간 명칭을 붙이기는 했는데.. 범용으로 쓰는게 맞는지, Order전용으로 쓰는게 맞는지 아직은 확신할 수가 없어서 우선은 Order를 붙이기로 했다.

이렇게 하면 코드상 끝이다. 엌!..

자 테스트를 돌려보자.

@Slf4j
@RunWith(SpringRunner.class)
@ActiveProfiles(inheritProfiles = false, resolver = ActiveProfileResolver.class)
@SpringBootTest(classes = CoreApplication.class)
public class MqttConfigurationTest {

	@Autowired
	MqttOrderGateway mqttOrderGateway;
	
	@Test
	public void test() {
		log.debug("토픽 발행을 시작합니다.");
		mqttOrderGateway.sendToMqtt("즐거운 추석이에요.", "/message/topic");
		log.debug("토픽 발행이 종료되었습니다.");
	}
}

실행을 해보자!

올~~~ 녹색바!!! 엌.. 근데 이게 뭐야 끝난거 맞아? 발행된거 맞아? 

이래서 위에 MQTT.fx를 받아 놓았던 것이였다. 자 이제 설치했던 MQTT.fx를 실행하자.

커넥션정보를 등록한다. 아직 로컬기반에 별도의 시큐리티는 지정을 하지 않았으니 기본으로 세팅하면 된다.

Apply를 하고나면 구독정보를 지정할 수 있다. Connect후, Topic에 대한 path를 지정.. 이제 구독후 대기..

다시 테스트 코드를 실행해보자. 욧!!! 즐거운 추석이에요~

이제 안드앱을 만들어서 다중 디바이스에서 확인 가능한지 해보자.