본문 바로가기

Golang

동시성 프로그래밍 in Go

동시성이란?


“Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.”   - Rob Pike

동시성(concurrency)은 한 번에 많은 것 들을 처리하는 것이고 병렬성(parallelism)은 한 번에 많은 것 들을 하는 것입니다. 일반적인 예를 들면, 우리는 조깅을 하다가 신발 끈이 풀렸을 때 잠시 멈춰서 신발끈을 고쳐 메고 다시 달립니다. 우리는 조깅과 신발끈 묶기를 함께 처리할 수 있지만 신발끈을 묶으면서 달리진 못합니다. 이것이 동시성입니다. 반면에 음악을 들으면서 조깅을 할 수 있습니다. 이것이 병렬성입니다. 


프로그래밍에 관점에서 설명을 하면, 동시 프로그래밍은 독립적으로 실행 중인 프로세스들을 조합하여 처리하도록 합니다. 반면에 병렬 프로그래밍은 여러 연산들이 동시에 실행되게 합니다. 동시성을 이용하여 병렬 문제를 해결 할 수도 있지만, 그것이 목적은 아닙니다. 동시성은 구조에 관한 것입니다. 다른 관점에서 동시성은 코드의 속성이고 병렬성은 프로그램의 속성이라고 할 수 있습니다. 고루틴을 통해 동시성 코드로 작성된 프로그램이 실제로는 병렬로 동작할 수도 안 할 수 도 있습니다.

 

병렬성과 동시성에 관한 자세한 구분은 여기를 참조해주세요.



의사소통을 통한 공유 (Share by communicating)


많은 환경에서 동시성 프로그래밍은 공유 변수 접근을 정확하게 구현(뮤텍스, 세마포어 등) 하는데에 대한 복잡함과 어려움이 있습니다.  Go언어는 채널을 통하여 공유 변수를 전달하는 다른 접근법을 장려합니다. 한번에 하나의 고루틴만이 변수에 접근 할 수 있습니다. 따라서 설계 상 race condition이 발생하지 않습니다.  이러한 특성 때문에 Go 진영에서는 아래와 같은 슬로건이 있습니다.

Do not communicate by sharing memory; instead, share memory by communicating.


이러한 접근법은 너무 지나친 것이라 생각할 수도 있습니다. 예를 들어, 동시에 참조 되는 카운터를 구현 할 때 integer 변수에 뮤텍스를 사용함으로써 해결하는 심플한 방법이 있습니다.  그러나 높은 레벨에서 채널을 이용한 접근 제어는 분명하고 정확한 프로그램을 쉽게 만들 수 있도록 합니다.


참고) Go의 동시성에 대한 접근은 Hoare의 Communicating Sequential Processes (CSP)에서 비롯되었습니다.



고루틴 (Goroutine)


고루틴은 같은 주소 공간에서 다른 고루틴들과 동시에 실행되는 함수입니다. 여기서 주소 공간이란 프로세스의 메모리 공간에 관한 것입니다. 

고루틴은 OS의 스레드보다 가벼운 개념입니다. 고루틴들은 여러개의 스레드에 다중화(multiplex) 됩니다. 하나의 스레드 내에서는 하나의 고루틴이 실행되고 실행중인 고루틴이 블록되면 또 다른 고루틴이 스레드에서 실행됩니다. 이러한 고루틴의 스케쥴링과 OS로부터의 스레드 할당은 Go의 런타임이 대신 하기 때문에 개발자는 스레드 관리를 신경 쓰지 않아도 됩니다. 


함수 앞에 go 키워드를 붙임으로써 고루틴을 실행시킬 수 있습니다. 호출이 완료되면 고루틴은 조용하게 종료됩니다.


go list.Sort()  // run list.Sort concurrently; don't wait for it.


함수 리터럴은 고루틴 호출에 유용할 수 있습니다.


func Announce(message string, delay time.Duration) {
    go func() {
        time.Sleep(delay)
        fmt.Println(message)
    }()  // Note the parentheses - must call the function.
}


Go에서 함수 리터럴은 클로저입니다. 따라서 함수에 의해 참조된 변수들은 함수들이 활성화되는 동안 생존할 수 있습니다. 위 예제에서 고루틴으로 실행되는 익명함수에서 자신의 범위 밖의 변수인 delaymessage를 참조하고 있습니다.


이러한 예제들은 고루틴 실행 후 완료 됬음을 알려줄 방법이 없으므로 실용적이지 않습니다. 이를 위해서는 채널이 필요합니다.



채널 (Channel)


채널은 make로 할당되고 make의 리턴 값은 데이터 구조의 참조값으로 동작합니다. 선택 값인 두번째 정수 파라미터를 입력하면 채널의 버퍼사이즈가 세팅됩니다. 디폴트는 버퍼 사이즈가 0인 동기화 채널로 할당됩니다.


ci := make(chan int)            // unbuffered channel of integers
cj := make(chan int, 0)         // unbuffered channel of integers
cs := make(chan *os.File, 100)  // buffered channel of pointers to Files


이전 예제에서는 sort 함수를 백그라운드에서 실행하기만 했습니다. 채널을 이용하면 고루틴이 완료되기까지 기다릴 수 있습니다.


c := make(chan int)  // Allocate a channel.
// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {
    list.Sort()
    c <- 1  // Send a signal; value does not matter.
}()
doSomethingForAWhile()
<-c   // Wait for sort to finish; discard sent value.


수신자는 데이터를 수신할 때 까지 블록됩니다. unbuffered 채널(동기화 채널)인 경우,  송신자 또한 수신자가 버퍼를 수신할 때 까지 블록됩니다. 버퍼를 가진 채널인 경우, 송신자는 보내려는 값이 버퍼로 복사 될때까지만 블록됩니다. 만약 버퍼가 꽉 차게되면 수신자가 채널에서 값을 가져갈때 까지 송신을 대기합니다.


buffered 채널은 세마포어처럼 사용될 수 있습니다. 예를들어 한번에 처리될 수 있는 인스턴스 수를 제한하는 서버를 구현해보겠습니다. 들어온 요청은 handle로 채널을 통해 전달되고, sem 채널 버퍼를 하나 채운 다음 요청을 처리하고 다시 버퍼를 하나 비웁니다. 이 과정에서 버퍼가 다 차면 버퍼의 공간이 생길 때 까지 대기해야하므로 버퍼의 사이즈 만큼 요청에 대한 동시 처리 수를 제한 할 수 있습니다.


var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    sem <- 1    // Wait for active queue to drain.
    process(r)  // May take a long time.
    <-sem       // Done; enable next request to run.
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)  // Don't wait for handle to finish.
    }
}


위와 같은 디자인에는 문제가 있습니다. 요청이 들어올 때마다 Serve 함수는 새로운 고루틴을 생성합니다. 요청이 매우 빠르게 들어오는 경우, 비록 한번에 MaxOutstanding만큼 요청이 처리되지만 handle 고루틴은 제한 없이 생성됩니다. 이 문제는 아래와 같이 해결할 수 있지만 아직도 해결해야 할 버그가 존재합니다.


func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func() {
            process(req) // Buggy; see explanation below.
            <-sem
        }()
    }
}


버그는 for 루프 안에 있습니다. Go에서 익명함수는 클로저이기 때문에 모든 고루틴에서 req 변수를 공유하게 됩니다. 한가지 해결방법은 고루틴의 인수로 req를 전달하는 것입니다.


func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func() {
            process(req) // Buggy; see explanation below.
            <-sem
        }(req)
    }
}


또 다른 해결 방법으로 동일한 이름으로 새로운 변수를 할당하는 방법이 있습니다. 


func Serve(queue chan *Request) {
    for req := range queue {
        req := req // Create new instance of req for the goroutine.
        sem <- 1
        go func() {
            process(req)
            <-sem
        }()
    }
}


이러한 표현은 매우 이상해보일 수 있습니다.


req := req


하지만 이는 Go 문법에 맞는 표현입니다. 루프가 돌때마다 동일한 이름의 새 변수가 기존 변수를 지역적으로 가림으로써, 각 고루틴에는 유니크한 변수가 세팅됩니다.



채널의 채널


Go의 채널은 변수로 할당되고 파라미터로 전달 될 수 있는 일급변수(first-class value)의 특성을 가집니다. 이러한 특성은 안전한 병렬 역다중화를 구현하는데 사용됩니다. (역다중화 : 서버가 여러 클라이언트에서 받은 요청을 처리하고 각 클라이언트에게 응답을 보내는 것을 말함)


앞에서 다루었던 handle 예제에서 요청의 타입을 정의하지 않았습니다. Request 채널이 응답해야 하는 채널을 가지고 있다면, 각 클라이언트가 요청을 보낼 때 응답 받고자 하는 경로를 지정할 수 있습니다.


type Request struct {
    args        []int
    f           func([]int) int
    resultChan  chan int
}


클라이언트는 응답 채널과 함수와 인자를 지정할 수 있습니다.


func sum(a []int) (s int) {
    for _, v := range a {
        s += v
    }
    return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// Send request
clientRequests <- request
// Wait for response.
fmt.Printf("answer: %d\n", <-request.resultChan)


서버 측에서 핸들러 함수는 요청 채널 내의 인자를 함수에 적용하여 응답 채널로 보내게 됩니다.


func handle(queue chan *Request) {
    for req := range queue {
        req.resultChan <- req.f(req.args)
    }
}



병렬처리 (Parallelization) 


채널과 고루틴을 이용하여 mutiple CPU 코어 환경에서 병렬 계산을 수행할 수 있습니다. 아래 예제는 연산을 코어 갯수로 나누어 독립적으로 수행하고 각 연산이 완료되면 채널로 완료 신호를 보냅니다.


const numCPU = 4 // number of CPU cores

type Vector []float64

// Apply the operation to v[i], v[i+1] ... up to v[n-1].
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
    for ; i < n; i++ {
        v[i] += u.Op(v[i])
    }
    c <- 1    // signal that this piece is done
}

func (v Vector) DoAll(u Vector) {
    c := make(chan int, numCPU)  // Buffering optional but sensible.
    for i := 0; i < numCPU; i++ {
        go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
    }
    // Drain the channel.
    for i := 0; i < numCPU; i++ {
        <-c    // wait for one task to complete
    }
    // All done.
}


CPU의 갯수를 직접 지정하지 않고 런타임에 요청하여 알 수 있습니다. runtime.NumCPU 함수는 하드웨어 CPU 코어 갯수를 리턴합니다.


var numCPU = runtime.NumCPU()


컴포넌트들을 독립적으로 실행함으로써 프로그램을 구조화하는 동시성과 multiple CPU에서 효율성을 위한 계산을 병렬 수행하는 병렬성을 혼동하면 안됩니다. Go의 동시성 특성을 이용하여 병렬 계산을 쉽게 구조화 할 수 있지만, Go는 동시성 언어이기 때문에 병렬화 문제가 Go의 모델에 맞지 않을 수 있습니다. 



Leaky buffer


비동시성도 동시성 프로그래밍의 도구로 쉽게 표현될 수 있습니다. 다음은 leaky buffer를 사용한 RPC 패키지를 추상화한 예제입니다. (leaky buffer란 밑빠진 독처럼 한 곳에서 데이터를 버퍼에 채우고 다른 한 곳에서는 버퍼의 데이터를 가져 가는 버퍼를 말함) 


클라이언트 고루틴은 루프에서 네트워크로 부터 데이터를 수신합니다. 통신마다 버퍼의 할당과 해제하는 일을 피하기 위해서 freeList 란 buffered 채널을 사용합니다. 채널이 비어있으면 새로운 버퍼를 할당하고 메시지 버퍼가 준비되면 서버로 전달하는 serverChan으로 보냅니다.


var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)

func client() {
    for {
        var b *Buffer
        // Grab a buffer if available; allocate if not.
        select {
        case b = <-freeList:
            // Got one; nothing more to do.
        default:
            // None free, so allocate a new one.
            b = new(Buffer)
        }
        load(b)              // Read next message from the net.
        serverChan <- b      // Send to server.
    }
}


서버에서는 클라이언트로 받은 메시지를 수신하고 처리한 뒤에 freeList 버퍼를 반환합니다.


func server() {
    for {
        b := <-serverChan    // Wait for work.
        process(b)
        // Reuse buffer if there's room.
        select {
        case freeList <- b:
            // Buffer on free list; nothing more to do.
        default:
            // Free list full, just carry on.
        }
    }
}


클라이언트는 freeList부터 버퍼를 가져오기를 시도합니다. 가져올 버퍼가 없으면 새로운 버퍼를 할당합니다. 서버에서는 리스트의 버퍼가 다 차면  default 절에서 아무 것도 처리하지 않는다. 이렇게 리스트에 들어가지 못한 버퍼는 나중에 garbage collector에 의해 식별되어 메모리가 해제됩니다. (select 문에서 default 절은 다른 케이스가 준비되지 않을 때 수행되며, default 케이스가 있으면 select는 블록되지 않습니다.) 



바쁘지 않으면 직접 처리하라


아래 예제는 단순한 요청을 처리하는 웹서버를 작성한 것입니다.


package main

import (
	"fmt"
	"log"
	"net/http"
)

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello, GopherCon SG")
	})
	go func() {
		if err := http.ListenAndServe(":8080", nil); err != nil {
			log.Fatal(err)
		}
	}()

	for {
	}
}


하지만 이 프로그램은 의도한 바와 다르게 웹 요청을 처리하는 것과 동시에 다른 일을 하고 있습니다. 바로 for문 내에서 무한 루프를 돌면서 CPU를 낭비하고 있습니다. Go 런타임은 고루틴을 협조적으로 스케쥴링 하기 때문에 두개의 고루틴(main, http server)이 쓸데없이 번갈아가면서 동작할 것입니다. 


아래와 같은 방법으로 이를 해결 할 수 있습니다.


func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello, GopherCon SG")
	})
	go func() {
		if err := http.ListenAndServe(":8080", nil); err != nil {
			log.Fatal(err)
		}
	}()

	for {
		runtime.Gosched() // 다른 고루틴에게 프로세서를 양보하는 함수 
	}
}


나이스해보이진 않지만 실제로 흔히 보이는 해결책입니다. 하지만 이는 증상에 대한 처리이지 근본적인 문제를 해결한 것은 아닙니다. 


Go에 조금 더 경험이 있으신 분이면 대신에 아래처럼 작성하실 수 있을겁니다. 


func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello, GopherCon SG")
	})
	go func() {
		if err := http.ListenAndServe(":8080", nil); err != nil {
			log.Fatal(err)
		}
	}()

	select {}
}


비어 있는 select 문은 영원히 블록됩니다. 이제는 고작 runtime.GoSched()을 호출하기 위해서 번갈아 동작하는 일이 발생하지 않을 것입니다. 하지만 아직도 증상에 대한 치료이지 근본 원인을 해결하지는 못했습니다.


현재 고루틴이 다른 결과를 얻을 때까지 진행할 수 없다면, 그것을 새로운 고루틴을 통해 위임하는 것보다 직접 수행하는 것이 더 낫습니다. 직접 수행하면 결과를 리턴 받기 위한 채널 조작이나 진행 상태 확인 등을 제거하여 구조가 단순해집니다.


func main() { // http요청 처리를 위임하지 않고 직접 처리한다.
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello, GopherCon SG")
	})
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatal(err)
	}
}



호출자에서 동시성 유지


아래 두가지 형태의 API가 있습니다. 하나는 리턴값이 슬라이스이고, 다른 하나는 채널을 리턴합니다.


// ListDirectory returns the contents of dir.
func ListDirectory(dir string) ([]string, error)
// ListDirectory returns a channel over which
// directory entries will be published. When the list
// of entries is exhausted, the channel will be closed.
func ListDirectory(dir string) chan string


첫 번째 API는 디렉토리 전체를 읽어서 슬라이스로 리턴합니다. 따라서 디렉토리가 클수록 수행 시간이 오래 걸립니다. 반면에 두 번째 API는 채널을 통해 디렉토리를 하나씩 전달하기 때문에 고루틴으로 수행 될 것이, 따라서 블록 없이 다음 흐름을 진행 할 수 있습니다. 


하지만 채널 버전의 API는 두가지 문제를 가집니다.

  • 디렉토리 스캔 중에 에러가 발생했을 때 호출자에게 알려줄 방법이 없습니다. 디렉토리가 비어 있을 때와 에러가 발생했을 때를 구분할 방법이 없습니다. 두 경우 모두 채널이 닫히면서 끝나기 때문입니다.
  • 호출자는 채널이 닫힐 때 까지 계속해서 읽어야 합니다. 비록 원하는 디렉토리를 찾았더라도 채널을 닫기 위해선 끝까지 확인해야 합니다. (호출자 측에서 채널을 강제로 닫는 방법은 논외) 슬라이스를 이용한 API보다 메모리 사용 측면에서 효율적이지만, 결코 더 빠르진 않습니다.

위의 두 가지 API의 문제점에 대한 해결책은 콜백을 사용하는 것입니다. 전달된 함수가 각 디렉토리마다 수행되기 때문에 원하는 디렉토리를 찾았을 때 함수 호출을 종료하도록 구현 할 수 있고 고루틴을 통해 비동기로 수행 될 수 있습니다.


func ListDirectory(dir string, fn func(string))



References