본문 바로가기

AWS Simple Queue Service 맛보기

AWS SQS

aws에는 sqs라는 간단한 queue를 만들 수 있는 서비스가 존재한다. 이번 포스팅에서는 sqs에서 fifo queue를 만들고, python boto3를 이용해 sqs에서 데이터를 넣고, 받는 작업을 진행해 볼거다.

이 포스팅 github

https://github.com/ladianchad/aws_sqs_study

 

GitHub - ladianchad/aws_sqs_study

Contribute to ladianchad/aws_sqs_study development by creating an account on GitHub.

github.com

Queue 생성

  1. 대기열 생성 클릭
  2. fifo 선택후 대기열 이름 입력(fifo로 선택하면 무조건 .fifo로 끝나는 이름이어야 한다.) 및 생성 완료
  3. 생성 완료

이제 SQS를 사용하기 위한 준비는 끝났다.

메세지 전송 및 수신을 눌러 간단하게 SQS에 데이터를 넣어보자.

**MessageGroupId 와 MessageDeduplicationId는 필수 항목이며, MessageDeduplicationId는 고유한 값이어야 한다.(중복시 자동으로 message가 삭제 된다.)

 

Boto3

boto3는 간단히 말해서 aws service를 사용하기 쉽게 만들어주는 python libaray이다. boto3가 sqs도 지원을 해주니 이를 이용해서 sqs에 데이터를 넣고, 받아 보자. 자세한 document를 원하면 아래 링크로 들어가면 된다.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html

 

SQS — Boto3 Docs 1.21.1 documentation

QueueUrl (string) -- [REQUIRED] The URL of the queue from which the PurgeQueue action deletes messages. Queue URLs and names are case-sensitive.

boto3.amazonaws.com

나는 좀더 간단하게 하기 위해 class를 하나 만들어 줬다.

from typing import Any
import boto3
from botocore.config import Config
import json
import queue
import time

class SqsManger():

    class MessageDto():
        def __init__(self, receipt_handle: str, data : Any) -> None:
            self.receipt_handle = receipt_handle
            self.data = data
        
        def __str__(self) -> str:
            return f'receipt_handle : {self.receipt_handle}\\ndata : {self.data}'

    def __init__(self, region: str, access_key: str, secret_key: str, queue_url: str, batch_size = 1, group_id : str = 'default') -> None:
        config = Config(
            region_name = region
        )
        
        self.access_key = access_key
        self.secret_key = secret_key
        self.region = region
        self.batch_size = batch_size
        self.group_id = group_id

        self.client = boto3.client('sqs', 
            config = config,
            aws_access_key_id = access_key,
            aws_secret_access_key = secret_key
        )

        self.queue_url = queue_url
        self.queue = queue.Queue()

    def __str__(self) -> str:
        return f'using boto3 sqs client\\naccess_key_id is {len(self.access_key)} character start with {self.access_key[0]}\\nsecret_key is {len(self.secret_key)} character start with {self.secret_key[0]}\\nregion is {self.region}\\nqueue name is {self.queue_url}'

    def __receivce(self) -> None:
        response = self.client.receive_message(
            QueueUrl = self.queue_url,
            MaxNumberOfMessages = self.batch_size
            )
        if 'Messages' in response:
            for message in response['Messages']:
                body = ''
                try:
                    body = json.loads(message['Body'])
                except :
                    body = message['Body']
                data = SqsManger.MessageDto(
                    receipt_handle = message['ReceiptHandle'],
                    data = body
                )
                self.queue.put(data)
    
    def get(self) -> Any:
        if self.queue.empty():
            self.__receivce()
        if not self.queue.empty():
            message : SqsManger.MessageDto = self.queue.get()
            self.client.delete_message(
                QueueUrl=self.queue_url,
                ReceiptHandle=message.receipt_handle
            )
            return message.data
        return None
    
    def put(self, data) -> None:
        self.client.send_message(
            QueueUrl = self.queue_url,
            MessageBody = json.dumps(data),
            MessageGroupId = self.group_id,
            MessageDeduplicationId = str(time.time())
        )
  • 설명
    • receive_message
      • SQS에서 데이터를 가져오는 함수이다.
      • 기본적으로 fifo로 설정하면 같은 message는 한번만 수신 된다.
    • MaxNumberOfMessages
      • boto3에서 default 값은 1로 되어있다.
      • SQS에서 가져오는 최대 message 개수이다.
      • 특별한 일 없으면 1로 하자.
    • ReceiptHandle
      • message를 삭제할 때 필요한 key값 같은 것이다.
      • 분실 되면 message가 설정해 놓은 기간이 다 될 때까지 못 지운다...
    • delete_message
      • message를 지워주는 역할이다.
        • SQS에서 message를 가져왔다 해서 자동으로 삭제 되는게 아니다. 지워주자.
    • send_message
      • SQS에 데이터를 넣어주는 함수이다.
    • MessageDeduplicationId
      • 고유한 값이 들어가야하기 때문에 message가 만들어진 시점의 시간을 key값으로 사용하였다.

모든 설정이 끝나면 다음과 같이 잘 작동하는걸 확인 할 수 있다.