ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spring] Spring boot에서 Amazon MQ(Active MQ) 연결하기
    웹 개발/Spring Framework 2020. 7. 16. 00:12

    Amazon MQ란?

    Amazon MQ는 클라우드에서 메시지 브로커를 쉽게 설정하고 운영할 수 있는 Apache ActiveMQ용 관리형 메시지 브로커 서비스입니다.

    Amazon MQ는 Active MQ를 사용하고 있다. 이름처럼 AWS에서 제공해주고 있는 서비스이기도 하다.

     

    Active MQ에 대해 더 알고 싶다면 여기를 참고해보자!

     

    다 읽어보진 않아도 어떤 아키텍처로 이루어져 있는지는 알아야쥬

     

    그림 하나만 보면 이해가 된다.

     

     

    메시지를 생산하는 Producer,

     

    메시지를 전달하는 Broker,

     

    받은 메시지를 소비하는 Consumer 모델로 이루어져있다.

     

    Active MQ는 Queue 방식과 Publish/Subscribe 방식을 이용해 메시지를 전달한다.

     

    Queue 방식을 사용하면 Consumer들에게 하나의 메시지를 RR(Round Robin)방식으로 하나의 Consumer에게 처리하도록 보낸다.

     

    Publish/Subscribe(펍섭이라고도 불리는) 방식은 Topic에 관심있는 모든 컨슈머에게 메시지가 전달된다.

     

    우리 서비스에서 어떤 방식을 사용할지 요구사항을 생각해 골라야했는데, 최종적으로는 Queue 방식을 선택했다. 

     

    Rabbit MQ를 사용해서 비슷한 방식으로 사용하고자 했던 것도 있지만,

     

    Publish/Subscribe 방식을 사용하면 하나의 서비스에서 여러 인스턴스를 띄우고 있기에 멱등성을 고려해야했고, 

     

    현재 한 서비스에서 다른 하나의 서비스끼리만 통신하고 있었다.(서비스 구조 상 하나의 Producer와 하나의 Consumer만 존재하는 상황)

     

    이런 점을 고려했을 때, Queue 방식을 선택했는데, 만약 하나의 메시지가 발행되었을 때, 각기 다른 서비스에서

     

    서로 다른 처리가 필요했더라면 Publish/Subscribe 방식을 선택했을 것이다.

     

    JMS

    Active MQ가 메시지를 주고 받는데 사용하는 기술은 JMS이다. 

     

    위키피디아의 내용을 빌리자면,

    둘 또는 그 이상의 클라이언트 사이에 메시지를 보내기 위한 자바 메시지 지향 미들웨어 API

     

    Spring boot에서 JMS를 이용해서 Active MQ를 쉽게 사용할 수 있다.

     


    Spring boot + Amazon MQ(message queue) 연결하기

    기존에는 Rabbit MQ를 사용하고 있어서 AMQP를 이용해서 전환하려했지만, 예제 코드나 자료가 거의 나오지 않았다.

     

    구글링과 spring boot에서 실험해본 애플리케이션 로그를 보았을 때, Amazon MQ는 AMQP 1.0을 사용하고 있지만 spring-boot-starter에 포함되어있는 AMQP 의존성은 1.0 밑으로 사용을 하고 있는 것으로 보였다.

     

    AMQP 1.0을 사용하도록 하는 방법을 찾는 도중에 명확하거나 시원한 해결방법이 보이지 않았고, 이럴 경우에는 향후 유지보수에도 어려움이 있어보였다.

     

    결국 코드 수정이 많더라도 AWS에서 예제코드로 안내되어있는 openWire를 사용하기로 했다. 

     

    openWire는 Active MQ가 사용하는 프로토콜이면서 Java 기반으로 구현된 프로토콜이다.

     

     

    의존성 가져오기

    해당 예시는 spring-boot 2.3.0 버전, jdk 1.8, 설정파일은 yml 기준으로 작성했다.

     

    아래 dependency를 build.gradle에 선언해준다.

     compile("org.springframework.boot:spring-boot-starter-activemq")
     compile("com.fasterxml.jackson.core:jackson-databind")
    

     

    spring guide에는 activemq-broker 의존성도 선언하라고 적혀있는데 spring-boot-starter-activemq 안을 들여다보니 activemq-broker 의존성도 들어와있어서 빼고 해봤더니 잘돼서 선언하지 않았다.

     

    의존성 관리를 spring boot에서해주니 넘나 편안.

     

    application.yml 설정 정보 입력하기

     

    application.yml에서 호스트 정보를 입력해 Spring Boot가 Active MQ에 대한 정보를 읽을 수 있도록 한다.

    spring:
      activemq:
        broker-url: failover:(ssl://{인스턴스1의 엔드포인트}, ssl://{인스턴스2의 엔드포인트})
        user: {유저 정보}
        password: {패스워드}

    failover를 사용하면 하나의 인스턴스가 죽었을 때 다른 인스턴스를 사용하게 된다. 운영에서 중요한 요소!일 것이다.

     

    Message Serialize 설정 잡아주기

    @Configuration
    public class JmsConfiguration {
    
        @Bean
        public MessageConverter jacksonJmsMessageConverter() {
            MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
            converter.setTargetType(MessageType.TEXT);
            converter.setTypeIdPropertyName("_type");
    
            // java8에서 LocalDate, LocalDateTime 타입 사용을 위해 선언한 설정
            JavaTimeModule timeModule = new JavaTimeModule();
            timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ISO_LOCAL_DATE));
            timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
            timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ISO_LOCAL_DATE));
            timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
    
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.registerModule(timeModule);
    
            converter.setObjectMapper(objectMapper);
    
            return converter;
        }
    }

    메시지 컨버팅을 Jackson을 이용하며 String을 담는 Text 메시지 타입으로 설정한다.

     

    converter.setTypeIdPropertyName("_type")에서 TypeIdPropertyName은 객체를 구분하기 위한 속성의 이름이다. Jasckson mapper는 들어오는 Json을 deserialize를 할 때 무슨 객체를 사용하는지 알아야하기 때문. {"_type" : "com.zorba.Product"} 같은 내용이 포함될 것이다.

     

    번외: 메시지를 받기 위해 Listener의 Connection을 만들고 맺는 경우는 spring boot에서 자동으로 만들어주기 때문에 설정하지 않았다. (그렇지 않으면 JmsListenerContainerFactory를 만들어 Bean으로 등록해주어야한다. 커스텀이 필요하다면 JmsListenerContainerFactory를 반환해주는 Bean으로 등록하면 된다.)


    Queue방식으로 만들기

    이제부터 Publish/Subscribe 방식과 Queue 방식을 나눠서 설명하겠다. 위의 설명한 설정 방법은 모두 공통이다.

     

    Queue 등록하기

    예를 들어, 바코드가 생성되거나 사용되었을 때를 알고 싶어서 아래와 같은 Queue를 만들 수 있다.

    @Configuration
    public class JmsConfiguration {
    
        @Bean
        public ActiveMQQueue createBarcodeQueue() {
            return new ActiveMQQueue("barcode.create");
        }
        
        @Bean
        public ActiveMQQueue usedBarcodeQueue() {
            return new ActiveMQQueue("barcode.used");
        }
        
    }

    여기서 "barcode.create" 와 "barcode.used"가 Queue의 이름이 된다.

     

    해당 Queue의 이름을 지닌 Queue가 Amazon MQ에 생성된다.

     

    Producer 만들기 - Message 생성하기

    @Slf4j
    @Component
    public class JmsProducer {
    
        private final JmsTemplate jmsTemplate;
    
        // JmsTemplate 의존성을 주입받는다.
        // Spring 4.3부터 @Autowired 생략 가능
        public JmsProducer(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
        
        // JmsTemplate을 통해 인자로 받은 Topic 이름에 맞는 Topic에 메시지를 발행한다.
        public void publish(String topicName, Object value) {
            jmsTemplate.convertAndSend(topicName, value);
        }    
        
    }
    
    
    // 다른 클래스 Service -> JmsPublisher를 사용할 때
    @Service
    public class BarcodeHandlingService {
    
        private final JmsProducer jmsProducer;
        
        public BarcodeHandlingService(JmsProducer jmsProducer) {
        	this.jmsProducer = jmsProducer;
        }
        
        public void createBarcode() {
            // other processing
            // build barcodeObject        
            
            jmsProducer.publish("barcode.create", barcodeObject);
        }
        
        public void usedBarcode() {
        	// other processing
            // build barcodeObject
            
            jmsProducer.publish("barcode.used", barcodeObject);
        }
    }

     

    Consumer 만들기 - Message 수신하기

    @Component
    public class BarcodeHandleConsumer {
    
        @JmsListener(destination = "barcode.create")
        public void consumeBarcodeCreateMessage(@Payload BarcodeObject barcodeObject) {
        	// 바코드 생성 시 필요한 로직 수행 
        }
        
        @JmsListener(destination = "barcode.used")
        public void consumeBarcodeUsedMessage(@Payload BarcodeObject barcodeObject) {
        	// 바코드 사용되었을 시 필요한 로직 수행 
        }
    }

     

    받고자하는 Topic 이름을 @JmsListener의 destination에 입력한다.

     

    @Payload는 Message에 내가 꼭 넘긴 객체나 인자가 말고도 부가정보 등을 받을 수 있는데 @Payload는 내가 보낸 body만 받도록 한다.

     

    @Headers Map<String, Object> headers 로해서 인자를 하나 더 받으면 다른 부가정보를 볼 수 있다.

     

    가이드 문서들을 보면 JmsListener를 사용하기 위해 설정 시, @EnableJms를 사용하고 있는데, @EnableJms는 @JmsListener를 선언한 메서드들을 찾아서 메시지 리스너 컨테이너를 속하도록 만들고 생성해주는 annotation이다.

     

    포스팅 초반에 설명했듯이 Spring boot에서 JmsListenerContainerFactory를 기본적으로 만들어주기 때문에 선언하지 않았다.


    Publish/Subscribe방식으로 만들기

    중복되는 설명은 줄이고자 차이점만 설명하자면, Topic 방식을 사용하려면 application.yml Queue 등록 방식이 Topic으로 바뀐다.

     

    application.yml에 하나 더 코드를 작성해야한다.

    spring:
      activemq:
        broker-url: failover:(ssl://{인스턴스1의 엔드포인트}, ssl://{인스턴스2의 엔드포인트})
        user: {유저 정보}
        password: {패스워드}
      jms: 
        pub-sub-domain: true // 요 녀석을 true로 바꿔주어야한다.

    jms에서 pub-sub 방식을 사용하겠다고 flag를 true로 바꿔줘야한다.

    그렇지 않으면 Publish/Subscribe 방식이 아닌 Queue 방식으로 메시지가 전달된다.

     

    Topic 등록하기

    @Configuration
    public class JmsConfiguration {
    
        @Bean
        public ActiveMQTopic createBarcodeTopic() {
            return new ActiveMQTopic("barcode.create");
        }
        
        @Bean
        public ActiveMQTopic usedBarcodeTopic() {
            return new ActiveMQTopic("barcode.used");
        }
        
    }

    여기서 "barcode.create" 와 "barcode.used"가 Topic의 이름이 된다.

     

    해당 Topic의 이름을 지닌 Topic이 Amazon MQ에 생성된다.


     

    참고

    spring boot, amazon mq 사용 전반적인 글

    typeIdProperty에 대한 내용

    failOver에 대한 내용

    Active MQ 아키텍처에 대한 내용 

    Spring Boot에서 JMS를 이용하는 방법 

Designed by Tistory.