프로그래밍언어/Golang

Go 언어로 PubSub 모델 개발하기

D.Y 2024. 11. 24. 10:44
반응형

Go 언어로 Pub/Sub(Publish-Subscribe) 라이브러리를 개발해보자. (심심하니까…)

목차

  1. Pub/Sub의 핵심 개념 이해
  2. 설계 구조 정의
  3. 단계별 개발
  4. 테스트 작성
  5. 고급 기능 추가

1. Pub/Sub의 핵심 개념 이해

Pub/Sub 시스템의 기본 원리는 다음과 같다.

  • Publisher: 메시지를 특정 Topic에 발행
  • Subscriber: Topic을 구독하고 해당 토픽의 메시지를 수신
  • Broker: 발행된 메시지를 관리하고 적절한 구독자에게 전달

2. 설계 구조 정의

Go의 구조체와 채널을 활용해 Pub/Sub 시스템을 설계할 수 있다. 아래는 기본 구성요소이다.

  • Topic: 메시지를 그룹화하는 주체
  • Publisher: 메시지를 발행하는 인터페이스
  • Subscriber: 메시지를 구독하는 인터페이스
  • Broker: 토픽별로 메시지를 관리하고, 발행/구독 로직을 처리

3. 단계별 개발

1단계: 기본 구조 만들기

Go의 기본 자료 구조와 채널을 활용해 구조체를 정의한다.

package pubsub

import "sync"

type PubSub struct {
	topics map[string][]chan string // 토픽별 구독 채널 목록
	mu sync.RMutex                  // 동시성 제어를 위한 Mutex
}

// NewPubSub initializes the PubSub instance.
func NewPubSub() *PubSub {
	return &PubSub {
		topics: make(map[string][]chan string),
	}
}

2단계: Publish 기능 구현

Publish 메서드는 특정 토픽에 메시지를 발행한다. 토픽에 연결된 모든 채널로 메시지를 전달한다.

func (ps *PubSub) Publish(topic, message string) {
	ps.mu.RLock()
	defer ps.mu.RUnlock()
	
	// 토픽의 모든 구독 채널로 메시지 전달
	if chans, found := ps.topics[topic]; found {
		for _, ch := range chans {
			ch <- message
		}
	}
}

3단계: Subscribe 기능 구현

Subscribe 메서드는 특정 토픽을 구독하고, 새 메시지를 수신하기 위한 채널을 반환한다.

func (ps *PubSub) Subscribe(topic string) <-chan string {
	ps.mu.Lock()
	defer ps.mu.UnLock()
	
	ch := make(chan string, 1) // 비동기 처리를 위해 버퍼 채널 사용
	ps.topics[topic] = append(ps.topics[topic], ch)
	
	return ch
}

4단계: Unsubscribe 기능 구현

구독을 해제하여 리소스를 관리한다.

func (ps *PubSub) Unsubscribe(topic string, subChan <-chan string) {
	ps.mu.Lock()
	defer ps.mu.Unlock()
	
	if chans, found := ps.topics[topic]; found {
		for i, ch := range chans {
			if ch == subChan {
				// 채널 제거
				ps.topics[topic] = append(chans[:i], chans[i+1:]...)
				close(ch) //채널 닫기
				break
			}
		}
	}
}

동시성 제어를 위해 Mutex 이걸 꼭 사용해야할까?

PubSub 라이브러리는 여러 Publisher와 Subscriber가 동시에 작동하는 환경을 지원해야한다. 이 과정에서 동시성 문제가 발생할 가능성이 크며, 이를 제어하지 않으면 데이터 레이스와 같은 심각한 문제가 발생할 수 있다.

Mutex를 사용하지 않았을 경우

  • 데이터 레이스 topics 맵은 Publish, Subscribe, Unsubscribe 메서드에서 동시에 접근된다. 예를들어, 한 Subscriber가 새로운 채널을 추가하려는 동안 다른 Publisher가 해당 토픽의 채널 목록을 읽거나 메시지를 전달하려고 하면, 데이터가 손상되거나 예외가 발생할 수 있다.
  • 예상치 못한 동작 Go 런타임에서 동시 접근으로 인해 일부 구독자가 등록되지 않거나 메시지를 수신하지 못할 수 있다. 특히, append와 같은 함수는 새로운 슬라이스를 생성하면서 기존 슬라이스를 덮어쓰기 때문에 한 메서드가 슬라이스를 변경하는 도중에 다른 메서드가 이를 읽으면 예기치 않은 값이 반환될 수 있다.
  • 프로그램 충돌 비정상적인 메모리 엑세스나 잘못된 포인터 참조로 인해 프로그램이 충돌하거나 패닉이 발생할 수 있다.

Mutex를 사용한 경우

  • 동시성 안전 보장 sync.RWMutex를 사용하여 읽기와 쓰기를 구분하고, 각 메서드에서 적절히 락을 적용하면 데이터 레이스를 방지할 수 있다. RLock은 읽기 작업이 병렬로 수행되도록 허용하며, Lock은 쓰기 작업을 단일로 처리하게 된다.
  • 데이터 무결성 보장 topics 맵의 상태가 항상 일광성을 유지한다. 모든 구독자와 발행자가 올바르게 등록 및 관리된다.
  • 안정적인 작동 여러 Publisher와 Subscriber가 동시에 작동하더라도 프로그램이 안정적으로 동작하며, 메시지가 정확히 전달된다.

Publish, Subscribe, Unsubscribe 메서드에서 사용한 락 종료

메서드 락 종류 이유
Publish RLock - 다중 읽기 작업을 지원 (Publish 작업은 동시에 실행 가능)
Subscribe Lock - 쓰기 작업 중 데이터 일관성을 보장
Unsubscribe Lock - 쓰기 작업 중 데이터 일관성을 보장

 

Subscribe에서 버퍼 채널을 사용한 이유는 뭘까?

버퍼가 없는 채널과 있는 채널은 동작 방식에 차이가 있다.

1) 버퍼가 없는 채널

Go에서 버퍼가 없는 채널은 동기적으로 동작한다. 즉, 데이터를 송신하는 쪽은 수신자가 준비될때까지 대기한다. 수신자가 준비되지 않았다면 송신은 블로킹된다. 기본적으로 채널은 블로킹 방식으로 동작한다.

그렇기 때문에 구독자가 메시지를 즉시 읽지 못하면 메시지를 발행하는 쪽이 블로킹된다. 다수의 메시지를 빠르게 발행하는 상황에서 전체 시스템이 멈출 수 있다.

2) 버퍼가 있는 채널

버퍼가 있는 채널은 비동기적으로 동작한다. 송신자는 버퍼가 가득 차지 않은 한 데이터를 채널에 비동기적으로 보낼 수 있다. 버퍼가 가득 찼을 때만 송신자가 블로킹된다.

따라서, 구독자가 메시지를 느리게 소비하더라도 Publisher는 일정 수준까지 메시지를 계속 발행할 수 있다.

4. 테스트 작성

구현 기능이 의도대로 동작하는지 검증하기 위한 테스트 코드를 작성한다.

package pubsub_test

import (
	"pubsub"
	"testing"
	"time"
)

func TestPubSub(t *testing.T) {
	ps := pubsub.NewPubSub()
	
	// topic1에 대해 구독을 설정
	sub := ps.Subscribe("topic1")
	
	// topic1에 메시지 발행
	ps.Publish("topic1", "First Message")
	
	// 첫 번째 메시지 수신 확인
	select {
		case msg := <-sub:
			if msg != "First Message" {
				t.Errorf("Expected 'First Message', got '%s'", msg)
			}
		case <-time.After(1 * time.Second):
			t.Errorf("Did not receive first message in time")
	}

	// 구독 해제
	ps.Unsubscribe("topic1", sub)

	// topic1에 메시지 발행 (구독 해제 이후)
	ps.Publish("topic1", "Second Message")

	// 두 번째 메시지는 수신되지 않아야 함
	select {
	case msg := <-sub:
		t.Errorf("Unexpectedly received message after unsubscribe: '%s'", msg)
	case <-time.After(500 * time.Millisecond);
		// 성공적으로 메시지를 수신하지 않음
	}
}

5. 고급 기능 추가

기본 구조가 잘 작동한다면 추가적으로 고려해볼 사항이다.

  • Priority Handling: 메시지에 우선순위를 부여하여 처리할 수 있다.
  • Message Persistence: 메시지를 디스크에 저장하여 영속성 있게 관리한다.
  • Dynamic Message: 전송되는 메시지를 다이나믹하게 변경할 수 있도록 한다.
  • Wildcard Topics: 와일드카드 구독 지원
반응형