카테고리 없음

Elasticsearch 로그 저장소 문제 해결 사례 V2

D.Y 2024. 11. 16. 22:55
반응형

로그 라이브러리 개선

현재 나의 프로젝트에서 다양한 서버에서 로그를 기록하기 위해 로그 라이브러리를 사용하여 엘라스틱서치에 직접 데이터를 전송하고 있다.

이 과정에서 동기적으로 동작하는 훅(Hook)을 통해 Elasticsearch에 로그를 기록하고 있었는데, 이로 인해 문제가 발생했다.

 

Elasticsearch에 장애가 발생하거나 응답이 지연되는 상황에서, 로그 전송 작업이 API의 주요 처리 흐름을 가로막는 문제가 나타난 것이다. 

 

결과적으로, 로그 라이브러리를 사용하는 모든 API에서 문제가 발생했고, 로그 전송이 API 응답 시간 초과를 유발하면서 시스템 전반에 장애가 발생했다.


문제 요약

  1. 로그 전송이 동기적으로 처리됨.
  2. 엘라스틱서치 장애 시 로그 훅이 블로킹되어 API의 정상적인 동작을 방해.
  3. 모든 API가 영향을 받으며, 특히 사용자 로그인 같은 주요 기능에서 문제 발생.

이러한 문제를 해결하기 위해, 로그 전송을 비동기적으로 처리하거나 타임아웃을 설정하여 엘라스틱서치 장애 상황에서도 시스템의 안정성과 응답 속도를 보장하는 개선이 필요하다.

 

먼저, 현재 상황을 테스트해보자.

응답을 10초 보다 늦어지는 상황을 어떻게 테스트할 수 있을까?

단순하게 응답이 10초 이상 걸리는 Mock 서버를 구현하고 테스트해보자.

간단하게 elasticsearch와 동일한 주소로 응답을 10초 후에 제공하는 Mock 서버를 구현한다.

package main

import (
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"time"
)

func main() {
	http.HandleFunc("/api-log/_doc/", func(w http.ResponseWriter, r *http.Request) {

		body, err := io.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "Failed to read request body", http.StatusInternalServerError)
		}
		defer r.Body.Close()

		fmt.Printf("Received request: %s\\n", body)

		time.Sleep(10 * time.Second)

		response := map[string]interface{}{
			"result": "created",
			"_index": "mock-index",
			"_id":    "mock-id",
		}
		w.Header().Set("Content-Type", "application/json")
		json.NewEncoder(w).Encode(response)
	})

	fmt.Println("Starting mock Elasticsearch server on :9200")
	http.ListenAndServe(":9200", nil)
}

 

Mock 서버를 실행하고 테스트코드를 동작시켜보니 동기적으로 처리되고 있음을 확인할 수 있었다.

 

문제가 발생한 로그라이브러리 코드는 아래와 같다. 차근차근 부족한 부분을 개선해보자.

package hooks

import (
	"context"
	"fmt"
	"github.com/olivere/elastic/v7"
	"github.com/sirupsen/logrus"
	"time"
)

// ElasticsearchHook logrus 사용하여 Elasticsearch 로그를 기록하는 커스텀 Hook 입니다.
type ElasticsearchHook struct {
	client      *elastic.Client
	serviceName string
}

// NewElasticsearchHook ElasticsearchHook 인스턴스 생성 합니다.
func NewElasticsearchHook(url string, serviceName string) *ElasticsearchHook {
	return &ElasticsearchHook{
		client:      newElasticInstance(url),
		serviceName: serviceName,
	}
}

func newElasticInstance(url string) *elastic.Client {
	client, err := elastic.NewClient(
		elastic.SetURL(url),
		elastic.SetSniff(false),
		elastic.SetHealthcheckInterval(10*time.Second),
		elastic.SetHealthcheck(false),
	)
	if err != nil {
		return nil
	}
	return client
}

func (hook *ElasticsearchHook) Fire(entry *logrus.Entry) error {
	logMessage := entry.Message
	logFields := entry.Data

	bodyMessage := map[string]interface{}{
		"message":     logMessage,
		"fields":      logFields,
		"level":       entry.Level,
		"serviceName": hook.serviceName,
		"createdAt":   time.Now().Format(time.RFC3339),
	}

	_, err := hook.client.Index().
		Index(indexName).
		BodyJson(bodyMessage).
		Do(ctx.Background())

	if err != nil {
		fmt.Printf("Failed to send log to Elassicsearch: %v\\n", err)
	}

	return err
}

func (hook *ElasticsearchHook) Levels() []logrus.Level {
	return logrus.AllLevels
}

외부 API 호출시 timeout 시간 설정

외부 API 호출 시 타임아웃(timeout) 시간을 설정하는 이유는 아래와 같다.

  • 응답 지연으로 인한 자원 낭비 방지 API 호출이 오랜시간 응답하지 않으면, 호출한 애플리케이션의 스레드 또는 연결이 대기 상태로 남아 자원이 낭비된다.
  • 시스템 안정성 유지 외부 API가 불안정하거나 장애가 발생했을때, 타임아웃 없이 무기한 대기하면 애플리케이션 전체에 영향을 줄 수 있다.
  • 사용자 경험 개선 사용자 요청에 대한 응답이 너무 걸리면 불편함을 초래한다.

따라서, elasticsearch가 호출되는 훅에 context를 활용하여 timeout을 설정한다.

 

timeout을 적용한 코드 (타임아웃은 3초로 설정한다.)

func (hook *ElasticsearchHook) Fire(entry *logrus.Entry) error {
	logMessage := entry.Message
	logFields := entry.Data

	bodyMessage := map[string]interface{}{
		"message":     logMessage,
		"fields":      logFields,
		"level":       entry.Level,
		"serviceName": hook.serviceName,
		"createdAt":   time.Now().Format(time.RFC3339),
	}

	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	_, err := hook.client.Index().
		Index(indexName).
		BodyJson(bodyMessage).
		Do(ctx)

	if err != nil {
		fmt.Printf("Failed to send log to Elassicsearch: %v\\n", err)
	}

	return err
}

 

Mock 서버를 통해서 테스트를 진행한 결과 3초 뒤에 타임아웃이 잘 동작하는 것을 확인한다.

로깅처리하는 부분을 비동기적으로 처리하도록 변경

현재 로깅처리(Fire 메서드)는 동기적으로 실행되므로 엘라스틱서치가 지연되면 API 응답도 지연된다.

이를 방지하기 위해 비동기 로깅 처리를 도입하면 API 응답과 로그 저장을 분리할 수 있다.

 

비동기 로깅은 고루틴과 채널을 사용해 로그 처리 작업을 API의 실행 흐름과 분리한다.

package hooks

import (
	"context"
	"fmt"
	"github.com/olivere/elastic/v7"
	"github.com/sirupsen/logrus"
	"time"
)

const BufferSize = 200

type ElasticsearchHook struct {
	client      *elastic.Client
	serviceName string
	logChannel  chan *logrus.Entry
}

func NewElasticsearchHook(url string, serviceName string) *ElasticsearchHook {
	hook := &ElasticsearchHook{
		client:      newElasticInstance(url),
		serviceName: serviceName,
		logChannel:  make(chan *logrus.Entry, BufferSize),
	}

	// 고루틴(Goroutine)을 활용하여 비동기로 로깅 작업 수행
	go hook.processLogs()

	return hook
}

func newElasticInstance(url string) *elastic.Client {
	client, err := elastic.NewClient(
		elastic.SetURL(url),
		elastic.SetSniff(false),
		elastic.SetHealthcheckInterval(10*time.Second),
		elastic.SetHealthcheck(false),
	)
	if err != nil {
		return nil
	}
	return client
}

func (hook *ElasticsearchHook) Fire(entry *logrus.Entry) error {
	select {
	case hook.logChannel <- entry:
		// 로그 메시지를 채널로 전달한 후 별도의 워커가 처리
	default:
		fmt.Println("ElasticsearchHook buffer is full")
	}
	return nil
}

func (hook *ElasticsearchHook) processLogs() {
	for entry := range hook.logChannel {
    // 실제 엘라스틱서치에 요청을 보내는 워커
		hook.syncFire(entry)
	}
}

func (hook *ElasticsearchHook) syncFire(entry *logrus.Entry) error {
	logMessage := entry.Message
	logFields := entry.Data

	bodyMessage := map[string]interface{}{
		"message":     logMessage,
		"fields":      logFields,
		"level":       entry.Level,
		"serviceName": hook.serviceName,
		"createdAt":   time.Now().Format(time.RFC3339),
	}

	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	_, err := hook.client.Index().
		Index(indexName).
		BodyJson(bodyMessage).
		Do(ctx)

	if err != nil {
		fmt.Printf("Failed to send log to Elassicsearch: %v\\n", err)
	}

	return err
}

func (hook *ElasticsearchHook) Levels() []logrus.Level {
	return logrus.AllLevels
}

 

결론

현재 프로젝트에서 발생한 엘라스틱서치(Elasticsearch) 로그 훅 처리의 문제는 동기적으로 동작하는 로깅 방식이 주요 원인이었다.

엘라스틱서치가 장애를 겪거나 응답 지연이 발생했을 때, 동기적 처리로 인해 로그 훅이 API의 흐름을 가로막아 모든 API 호출이 영향을 받았고, 이는 시스템 전반의 장애를 초래했다.

 

이 문제를 해결하기 위해 다음과 같은 개선 작업을 수행했다.

  1. 타임아웃 적용:
    • context.WithTimeout을 활용하여 외부 API 호출에 타임아웃을 설정.
    • 엘라스틱서치가 일정 시간 내에 응답하지 않으면 요청을 중단하고, API 흐름이 지연되지 않도록 구현.
    • 타임아웃을 3초로 설정하여 엘라스틱서치의 장애 상황에서도 API 안정성을 유지.
  2. 비동기 로깅 도입:
    • 고루틴과 채널을 활용해 로그 전송을 비동기적으로 처리.
    • 로그 메시지는 채널에 적재되고, 별도의 워커가 이를 비동기적으로 엘라스틱서치에 전송하도록 변경.
    • API 응답과 로그 처리의 흐름을 분리하여, 로그 처리 지연이 API 응답 시간에 영향을 미치지 않도록 구현.
  3. 버퍼 크기 설정:
    • 채널의 버퍼 크기를 200으로 설정하여 순간적인 로깅 부하를 흡수할 수 있도록 개선.
    • 채널이 가득 찬 경우 로그 유실을 허용하면서도 시스템의 안정성을 우선 보장.

결과

  • 개선된 구조는 엘라스틱서치 장애 상황에서도 API가 정상적으로 작동하도록 보장한다.
  • 로그 전송 작업이 비동기적으로 처리됨에 따라 API 응답 시간이 지연되지 않으며, 사용자 경험도 향상되었다. 
  • 타임아웃을 통해 엘라스틱서치 응답 지연이나 장애로 인해 자원이 낭비되는 것을 방지할 수 있었다.
반응형