RabbitMq Thread Instance 침범 문제

RabbitMq 를 이용하여 비동기 이벤트 처리를 할 때, 발생했던 문제 해결 과정을 공유한다.

상황

  • 처리량이 많은 이벤트가 자꾸 사라지는 문제가 발생하였다.
  • 초기엔 Log 를 줄여놓은 상태라, Log 를 추가하여 살펴본 결과 어느 순간 갑자기 event 가 사라진다.

    하지만 시스템에 Error/Exception은 발생하지 않고, 다음 event 는 계속 처리가 되고 있었다.

  • 내부 처리의 안정성을 위해 autoAck = false / qos = 30
  • autoAck 를 사용하지 않기 때문에, 메시지 유실은 없지만 후추 재발송 하는데 시간이 소요되었다.

원인

  • RabbitMq 는 Consumer 에서 Handler 발식으로 메시지를 발송한다.

    consumer = new DefaultConsumer(channel) {
    
        @Override
        public void handleDelivery(
            String consumerTag, 
            Envelope envelope, 
            AMQP.BasicProperties properties, 
            byte[] body) throws IOException {
          
            handler.handleDelivery(channel, envelope, properties, body);
        }
    };
  • 이를 Handler 에서 연결하여 처리 하도록 해두었다.

    public interface MqHandler {
            
        void handleDelivery(
            Channel channel, 
            Envelope envelope, 
            AMQP.BasicProperties properties, 
            byte[] body);
    }
  • 처리량이 많을 경우 Thread 를 생성해서 보내주게 되는데 처리를 할때 같은 객체에서 처리를 할 경우에, 하나의 Thread 가 멈추고 나중에 들어온 메시지가 처리 될 경우에 인스턴스에 멤버를 새로이 넣어서 문제가 발생하는 경우가 있었다.

    private SmsSendFacade smsSendFacade = new SmsSendFacade();
    public void handleDelivery(
        Channel channel, 
        Envelope envelope, 
        AMQP.BasicProperties properties, 
        byte[] body) {
         
        smsSendFacade.send(body);
    }

개선

  • Thread 간에 간섭이 일어나서 Instance 에 문제가 생기는 것이기 때문에 facade 에서 처리할 때, 서로 간섭하지 않도록 처리 하였다.

    public void handleDelivery(
        Channel channel, 
        Envelope envelope, 
        AMQP.BasicProperties properties, 
        byte[] body) {
         
        new SmsSendFacade().send(body);
    }
Written on June 11, 2018