반응형

AWS SQS가 뭐지?

Amazon Simple Queue Service(SQS)는 쉽게 말해서 Queue입니다.
마이크로서비스, 분산 시스템 등에서 메시지 손실 걱정 없이 어떤 볼륨의 메시지든 전송, 저장 및 수신이 가능한 메시지 대기열입니다.
표준대기열, FIFO 대기열 유형이 있습니다.
 
 
그럼 SQS 송수신 구현을 해보도록 하겠습니다~

1. AWS SQS 대기열 생성

대기열 유형을 선택해야하는데, 각 장단점이 있습니다.
표준 대기열 : 무제한 처리량, 두 번 이상 메시지가 전달될 수 있음, 순서 보장이 안됨
FIFO 대기열 : 정확한 처리, 순서 보장
 
저는 FIFO 방식으로 선택하겠습니다.
FIFO 대기열은 이름 뒤에 반드시 .fifo를 붙여야 합니다.
 

저는 KAKAO_MESSAGE.fifo 라는 이름의 SQS를 만들어보겠습니다.
 

 

2. SQS 메시지 전송 test

SQS가 생성이 됐다면 메시지 송수신 테스트를 해볼 수 있습니다.
 

메시지 전송 테스트

메시지 수신 테스트

메시지 폴링 버튼을 누르면 수신 테스트 시작이고, 그 사이에 메시지가 들어온다면 하단의 메시지란에 새로운 메시지가 생성이 됩니다.
 

3. IAM 사용자 생성

이제 코드에서 SQS 송수신을 하기 위해서 IAM 사용자를 생성해야 합니다.
aws-sqs-group이라는 이름으로 생성해 보겠습니다.
 

생성했다면 생성한 IAM 사용자의 key id와 access key를 가지고 코드를 작성해 보도록 할게요.
 

4. 코드 작성

Nestjs로 작성해 보겠습니다.
저는 @ssut/nestjs-sqs 라이브러리를 사용해서 기능 구현 해봤습니다!
 
라이브러리 install

npm i --save @ssut/nestjs-sqs
yarn add @ssut/nestjs-sqs

 

test-sqs.module.ts

sqs producer 설정을 해주는 부분입니다.

import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from "@nestjs/config";
import { SqsModule } from "@ssut/nestjs-sqs";
import { SQSClient } from "@aws-sdk/client-sqs";

@Module({
  imports: [
    SqsModule.registerAsync({
      imports: [ConfigModule],
      inject: [ConfigService],
      useFactory: (configService: ConfigService) => {
        return {
          producers: [{
            name: configService.get('SQS_NAME'),
            queueUrl: configService.get('SQS_PRODUCER_QUEUE_URL'),
            sqs: new SQSClient({
              region: configService.get('AWS_REGION'),
              credentials: {
                accessKeyId: configService.get<string>('AWS_SQS_ACCESS_KEY_ID'),
                secretAccessKey: configService.get<string>('AWS_SQS_SECRET_ACCESS_KEY') ,
              }
            })
          }]
        };
      },
    })
  ],
  providers: [TestSqsService],
  exports: [TestSqsService]
})
export class TestSqsModule {}

 

test-sqs.service.ts

sqs 메시지 송신 기능 메소드 구현부입니다.

import { Inject, Injectable } from "@nestjs/common";
import { SqsService } from "@ssut/nestjs-sqs";
import { MessageDto } from "../common/dto/biz.msg.dto";
import { ConfigService } from "@nestjs/config";

@Injectable()
export class TestSqsService {
  constructor(
    private configService: ConfigService,
    private readonly sqsService: SqsService,
  ) {}

  /**
   * SQS Message 송신
   * @param message SQS 전송 메시지
   */
  async send(message: MessageDto[]) {
    try {
      const body = JSON.stringify(message);

      const payload = {
        id: uuid, // 중복 방지를 위해 uuid 사용
        body: body,
        groupId: uuid,
        deduplicationId: uuid
      }

      const response = await this.sqsService.send(
        this.configService.get('SQS_NAME'),
        payload
      );

      console.log(`[SQS] send successful : [${JSON.stringify(response)}]`);

      return response;
    } catch (error) {
      console.log(error);
    }
  }
}

 
sqs consumer 부분도 같은 라이브러리를 사용해서 만들 수 있습니다~
 

References

https://www.npmjs.com/package/@ssut/nestjs-sqs

반응형

+ Recent posts