프로그래밍언어/Golang

DB 업데이트, SQS 메시지 발송 트랜잭션으로 묶을 수 있을까?

D.Y 2025. 7. 18. 09:53
반응형

DB 업데이트와 SQS 메시지 발송을 하나의 트랜잭션 즉, 원자적 단위로 묶을 수 없다.

그 이유는

  • 관계형 데이터베이스 트랜잭션은 DB 내부에서만 원자성이 보장되고
  • SQS(혹은 대부분의 메시지 큐)는 데이터베이스와의 2PC(분산 트랜잭션, XA 트랜잭션)을 지원하지 않기 때문이다.
  • 즉, 둘은 완전히 별도의 시스템이기 때문에 원자적 보장이 없으므로, 하나만 성공하거나 실패할 수 있는 상황이 생길 수 있다.

나는 주로 어떤 방식으로 구현했는가? (어떤 선택을 했었더라..?)

  1. DB 업데이트 트랜잭션을 실행한다.
  2. 트랜잭션 커밋에 성공하면 SQS 메시지를 전송한다.
  3. 트랜잭션 커밋에 실패하면 SQS 메시지를 전송하지 않는다.
  4. SQS 메시지를 전송에 실패할 경우 DB에 기록하고 후처리가 가능하도록 한다.

내가 선택한 방식에 대한 이유

  • SQS의 가용성은 월간 99.9%이기 때문에 메시지 전송에 실패하는 경우는 거의 존재하지 않을거라고 판단했다.
  • 하지만, 0.1%에 대비해 실패하는 경우에 후처리가 가능하도록 DB에 실패한 내역을 기록했다.
  • 만약 DB에 실패한 내역이 많이 쌓이는 경우에는 실패 목록에 대해서 Batch등으로 다시 SQS로 전송할 수 있다.

현재 방식의 한계점

  • SQS 메시지 전송과 DB 기록 사이에 다시 한 번 장애가 발생하면?
    • 예를들어, SQS 발송 실패 → DB에 실패 내역을 기록할때 DB 장애 발생
    • 아주 드문 경우지만 SQS 전송 실패에 대해 알 수 없는 상황이 생길 수 있다.
    • 즉, 논리적 일관성이 100% 보장되지 않는다.

논리적으로 일관성을 100% 보장할 수 없을까?

  • 이럴때, 트랜잭셔널 아웃박스 패턴을 사용한다.

트랜잭셔널 아웃박스 패턴

  • DB 업데이트 로직과 함께 별도의 아웃박스 테이블에 발생해야할 SQS 메시지 전송 이벤트에 대해서도 하나의 트랜잭션으로 DB에 기록한다.
  • 별도의 프로세스는 SQS 메시지 전송 이벤트가 저장된 테이블을 이용해서 SQS로 메시지를 발송한다. 그리고 발송에 성공하면 발송 완료로 처리한다.

예제 코드

// Outbox_events
CREATE TABLE outbox_events (
	id BIGINT AUTO_INCREMENT PRIMARY KEY,
	event_type VARCHAR(64) NOT NULL,
	payload TEXT NOT NULL,
	created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
	processed_at DATETIME,
	status VARCHAR(16) NOT NULL DEFAULT 'PENDING'
);
// 서비스 로직 (DB 트랜잭션 + Outbox 기록)
func CompleteOrder(db *sql.DB, orderId int64) error {
	// 하나의 트랜잭션으로 원자성 보장
	tx, err := db.Begin()
	if err != nil {
		return err
	}
	
	defer func() {
		if p := recover(); p != nil {
			tx.Rollback()
			panic(p)
		}
	}()
	
	// 주문 상태 완료로 업데이트
	_, err = tx.Exec("UPDATE orders SET status = ? WHERE id = ?", "COMPLETED", orderId)
	if err != nil {
		tx.Rollback()
		return err
	}
	
	// 주문 상태 완료 메시지 전송을 위한 Outbox 이벤트 기록
	payload, _ := json.Marshal(map[string]interface{}{
		"order_id": orderId,
		"status": "COMPLETED",
	})
	
	_, err := tx.Exec(`
		INSERT INTO outbox_events (event_type, payload)
		VALUES (?, ?)`,
		"OrderCompleted", string(payload))
	if err != nil {
		tx.Rollback()
		return err
	}
	
	return tx.Commit()
}
// Outbox 워커 (SQS 전송)
func OutboxWorker(db *sql.DB, sqsClient *sqs.Client, queueURL string) {
	for {
		// SQS 전송이 필요한 메시지 조회
		rows, err := db.Query(`
			SELECT id, event_type, payload FROM outbox_events
			WHERE status = 'PENDING'
			ORDER BY created_at ASC
			LIMIT 10`)
		if err != nil {
			log.Println("DB Query error:", err)
			time.Sleep(1 * time.Second)
			continue
		}
		
		for rows.Next() {
			var id int64
			var eventType, payload string
			if err := rows.Scan(&id, &eventType, &payload); err != nil {
				log.Println("Scan error:", err)
				continue
			}
			
			// SQS 메시지 전송
			_, err := sqsClient.SendMessage(context.TODO(), &sqs.SendMessageInput{
				QueueUrl: aws.String(queueURL),
				MessageBody: aws.String(payload),
				MessageGroupId: aws.String(eventType),
			})
			if err != nil {
				log.Println("SQS send error:", err)
				continue
			}
			
			// 성공 시 상태 갱신
			_, err = db.Exec(`
				UPDATE outbox_events
				SET status = 'PROCESSED', processed_at = NOW()
				WHERE id = ?`, id)
			if err != nil {
				log.Println("DB update error:", err)
			}
		}
		
		rows.Close()
		time.Sleep(1 * time.Second) // 워커 주기
	}
}

위의 워커 코드는 분산환경에서 동일한 데이터를 중복으로 SQS에 메시지를 전송할 수 있다. 해결하기 위해서는 PROCESSING 이라는 상태를 하나 더 추가해서 해결할 수 있다.

  • 예를들어 분산환경에서 워커 1과 워커 2가 동시에 PENDING 상태의 Outbox event를 조회했다.
  • 메시지를 전송하기 전에 각자 이벤트의 상태를 PROCESSING 으로 업데이트한다. 이때 중요한 것은 WHERE 조건에 status가 PENDING인 이벤트만 업데이트 하는 것이다. 이럴 경우 동시성 제어로 오직 하나의 워커만 update count를 반환 받을 것이다.
  • 이때, update count가 1인 이벤트만 SQS 메시지를 전송하면 된다.

DB 동시성 제어 원리

  • RDBMS는 row-level lock (행 단위 락)이나 MVCC(멀티 버전 컨커런시 컨트롤)로 UPDATE 충돌 시 동시 실행을 허용하지 않음
  • 두 워커가 동시에 UPDATE 쿼리를 날려도 DB 내부적으로는 머저 도달한 쿼리가 락을 점유함
  • 나머지 쿼리는 먼저 도달한 트랜잭션의 커밋/롤백 이후에 실행됨
  • 그때는 이미 status가 PENDING이 아니므로 WHERE 조건에 맞지 않아 update count = 0이 된다.
// 개선 된 Outbox 워커 (SQS 전송)

type OutboxEvent struct {
	ID int64
	EventType string
	Payload string
}

func OutboxWorker(db *sql.DB, sqsClient *sqs.Client, queueURL string, batchSize int) {
	for {
		events, err := fetchAndMarkProcessing(db, batchSize)
		if err != nil {
			log.Println("Fetch error:", err)
			time.Sleep(2 * time.Second)
			continue
		}		
		if len(events) == 0 {
			time.Sleep(1 * time.Second)
			continue
		}
		for _, event := range events {
			processEventAndMarkDone(db, sqsClient, queueURL, event)
		}
	}
}

func fetchAndMarkProcessing(db *sql.DB, batchSize int) ([]OutboxEvent, error) {
	// 처리 안된 이벤트 조회
	rows, err := db.Query(`
		SELECT id, event_type, payload
		FROM outbox_events
		WHERE status = 'PENDING'
		ORDER BY created_at ASC
		LIMIT ?`, batchSize)
	if err != nil {
		return nil, err
	}
	defer rows.Close()
	
	var events []OutboxEvent
	for rows.Next() {
		var event OutboxEvent
		if err := rows.Scan(&event.ID, &event.EventType, &event.Payload); err != nil {
			return nil, err
		}
		events = append(events, event)
	}
	if err := rows.Err(); err != nil {
		return nil, err
	}	
	
	// 각 이벤트별로 status를 PROCESSING으로 변경 (조건부 업데이트)
	var claimed []OutboxEvent
	for _, event := range events {
		res, err := db.Exec(`
			UPDATE outbox_events
			SET status = 'PROCESSING'
			WHERE id = ? AND status = 'PENDING'`, event.ID)
			if err != nil {
				continue
			}
			n, _ := res.RowsAffected()
			if n == 1 {
				claimed = append(claimed, event)
			}
	}
	return claimed, nil
}

func processEventAndMarkDone(db *sql.DB, sqsClient *sqs.Client, queueURL string, event OutboxEvent) {
	// SQS 발송
	_, err := sqsClient.SendMessage(context.TODO(), &sqs.SendMessageInput{
		QueueUrl: aws.String(queueURL),
		MessageBody: aws.String(event.Payload),
		MessageGroupId: aws.String(event.EventType),
	})
	if err != nil {
		log.Printf("SQS send error: %v (id=%d)\\n", err, event.ID)
		// 실패시 PENDING 상태로 롤백
    db.Exec(`
	    UPDATE outbox_events
		  SET status = 'PENDING'
      WHERE id = ? AND status = 'PROCESSING'`, event.ID)
		return 		
	}
	
	// 성공시 PROCESSED로 상태 업데이트
	_, err = db.Exec(`
		UPDATE outbox_events
		SET status = 'PROCESSED', processed_at = NOW()
		WHERE id = ?`, event.ID)
	if err != nil {
		log.Printf("DB update error: %v (id=%d)\\n", err, event.ID)
	}
}

추가 내용

PROCESSING 상태 장기 체류에 대한 Recovery/재처리

  • 네트워크 장애, 워커 프로세스 다운 등으로
  • PROCESSING 상태 이벤트가 영원히 남을 수 있다.
  • 주기적으로(예: 5~10분에 한 번)같은 조건의 row를 다시 PENDING으로 돌려주는 Recovery/Retry 크론 배치가 필요하다.
  • PROCESSING 상태에서 processed_at IS NULL && updated_at < now() - INTERVAL '5 MIN'
UPDATE outbox_events
SET status = 'PENDING'
WHERE status = 'PROCESSING'
  AND processed_at IS NULL
  AND updated_at < NOW() - INTERVAL 5 MINUTE;

Idempotency(멱등성) 보장

  • 네트워크 이슈, SQS API 타임아웃 등으로 메시지 중복 전송이 실제로 발생할 수 있음
    • (예: SQS가 정상 수신했으나 워커는 타임아웃으로 실패로 인식, 재전송)
  • 해결책
    • SQS FIFO라면 MessageDeduplicationId를 사용
    • SQS Standard라면 payload에 고유 이벤트 ID를 포함시킴
    • 핵심: 메시지를 소비하는 하위 서비스가 “같은 이벤트 ID를 한 번만 처리(멱등성)”하도록 구현

예제 코드

sqsClient.SendMessage(context.TODO(), &sqs.SendMessageInput{
	QueueUrl: aws.String(queueURL),
	MessageBody: aws.String(event.Payload),
	MessageGroupId: aws.String(event.EventType),
	// FIFO 큐라면 아래 필드도 사용
	MessageDeduplicationId: aws.String(strconv.FormatInt(event.ID, 10)),
})
반응형