icon

안동민 개발노트

4장 : 스레드

스레드 프로그래밍


이론으로 이해한 스레드를 코드로 직접 다루어 보겠습니다. C의 POSIX Threads(pthread)와 Python의 threading 모듈을 사용하여, 스레드 생성·종료·합류, 반환 값 수집, 동기화 기초, 스레드 풀을 실습합니다.


C — pthread 기본

스레드 생성과 합류

pthread_basic.c
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

void *thread_func(void *arg) {
    int id = *(int *)arg;
    printf("Thread %d running (tid=%lu)\n", id, (unsigned long)pthread_self());
    return NULL;
}

int main() {
    pthread_t threads[3];
    int ids[3] = {1, 2, 3};

    for (int i = 0; i < 3; i++) {
        int ret = pthread_create(&threads[i], NULL, thread_func, &ids[i]);
        if (ret != 0) {
            fprintf(stderr, "pthread_create failed: %d\n", ret);
            exit(1);
        }
    }

    for (int i = 0; i < 3; i++) {
        pthread_join(threads[i], NULL);
    }

    printf("All threads done\n");
    return 0;
}

pthread_create()의 네 가지 인자를 정리하겠습니다. 첫 번째는 스레드 식별자를 저장할 pthread_t 포인터, 두 번째는 스레드 속성(NULL이면 기본값), 세 번째는 스레드가 실행할 함수(void* (*)(void*)), 네 번째는 함수에 전달할 인자입니다.

pthread_join()은 해당 스레드가 종료될 때까지 호출 스레드를 블로킹합니다. join() 없이 main()이 끝나면 프로세스가 종료되어, 아직 실행 중인 스레드도 강제로 사라집니다. 이것은 5장에서 다룰 좀비 프로세스처럼 자원 반환이 이루어지지 않을 수 있습니다.

컴파일
gcc -o thread_basic pthread_basic.c -pthread

-pthread 플래그는 POSIX 스레드 라이브러리를 링크합니다. -lpthread와 비슷하지만, 컴파일 단계에서도 스레드 관련 전처리 매크로를 활성화하므로 -pthread를 권장합니다.

스레드에서 결과 받기

스레드 함수가 반환하는 void*pthread_join()의 두 번째 인자로 받을 수 있습니다.

pthread_return.c
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

void *compute(void *arg) {
    int input = *(int *)arg;
    int *result = malloc(sizeof(int));
    *result = input * input;
    return result;  /* 힙에 할당한 결과를 반환 */
}

int main() {
    pthread_t tid;
    int input = 7;
    void *retval;

    pthread_create(&tid, NULL, compute, &input);
    pthread_join(tid, &retval);

    int *result = (int *)retval;
    printf("7^2 = %d\n", *result);  /* 49 */

    free(result);  /* 스레드가 할당한 메모리를 해제 */
    return 0;
}

주의할 점: 스레드 함수 안의 지역 변수 주소를 반환하면 안 됩니다. 스레드가 종료되면 그 스레드의 스택이 해제되므로, 반환된 포인터는 댕글링 포인터(dangling pointer)가 됩니다. 반드시 malloc()으로 힙에 할당하거나, 전역·정적 변수를 사용해야 합니다.

인자 전달의 함정

흔한 실수를 살펴보겠습니다.

잘못된 인자 전달 (위험)
/* 이렇게 하면 안 됩니다 */
for (int i = 0; i < 3; i++) {
    pthread_create(&threads[i], NULL, thread_func, &i);
    /* 모든 스레드가 같은 &i를 공유!
       i가 변경되면 스레드가 잘못된 값을 읽을 수 있음 */
}

루프 변수 i의 주소를 직접 전달하면, 스레드가 실행되는 시점에 i가 이미 변경되어 있을 수 있습니다. 앞의 올바른 예제처럼 별도 배열에 값을 복사하거나, (void*)(intptr_t)i로 값 자체를 캐스팅하여 전달합니다.

detach vs join

pthread_join()은 스레드가 끝날 때까지 기다리고 자원을 회수합니다. 만약 스레드의 결과가 필요 없고, 끝나면 자동으로 자원이 해제되기를 원한다면 pthread_detach()를 사용합니다.

detached 스레드
pthread_t tid;
pthread_create(&tid, NULL, background_task, NULL);
pthread_detach(tid);
/* 이후 join 불가, 스레드 종료 시 자동 자원 회수 */

detached 스레드는 백그라운드 작업(로그 기록, 통계 수집 등)에 적합합니다. 단, detached 스레드는 main()이 먼저 종료되면 함께 종료되므로, 필요하다면 main()이 끝나지 않도록 해야 합니다.


Python — threading 모듈

기본 스레드 생성

threading_basic.py
import threading
import time

def worker(name, delay):
    tid = threading.current_thread().ident
    print(f"Thread {name} starting (tid={tid})")
    time.sleep(delay)
    print(f"Thread {name} done")

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i, 1))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print("All threads done")

Python의 threading.Threadtarget으로 실행할 함수, args로 위치 인자, kwargs로 키워드 인자를 전달합니다. start()를 호출해야 스레드가 실행되고, join()으로 종료를 기다립니다.

GIL의 영향 — CPU 바운드 vs I/O 바운드

CPython의 GIL(Global Interpreter Lock)은 한 번에 하나의 스레드만 Python 바이트코드를 실행하도록 강제합니다. 이것이 실질적으로 의미하는 바를 실험으로 확인해 보겠습니다.

gil_test.py
import threading
import time

def cpu_bound(n):
    """CPU 연산 집약적 작업"""
    total = 0
    for i in range(n):
        total += i * i
    return total

def io_bound(seconds):
    """I/O 대기 작업"""
    time.sleep(seconds)

# CPU 바운드: 싱글 스레드
start = time.time()
cpu_bound(10_000_000)
cpu_bound(10_000_000)
print(f"CPU 바운드 순차: {time.time() - start:.2f}s")

# CPU 바운드: 멀티 스레드
start = time.time()
t1 = threading.Thread(target=cpu_bound, args=(10_000_000,))
t2 = threading.Thread(target=cpu_bound, args=(10_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
print(f"CPU 바운드 2스레드: {time.time() - start:.2f}s")

# I/O 바운드: 싱글 스레드
start = time.time()
io_bound(1); io_bound(1)
print(f"I/O 바운드 순차: {time.time() - start:.2f}s")

# I/O 바운드: 멀티 스레드
start = time.time()
t1 = threading.Thread(target=io_bound, args=(1,))
t2 = threading.Thread(target=io_bound, args=(1,))
t1.start(); t2.start()
t1.join(); t2.join()
print(f"I/O 바운드 2스레드: {time.time() - start:.2f}s")
실행 결과 예시
CPU 바운드 순차: 2.10s
CPU 바운드 2스레드: 2.35s     ← 더 느림!  (GIL 경합 + 전환 오버헤드)
I/O 바운드 순차: 2.00s
I/O 바운드 2스레드: 1.01s     ← 2배 빠름! (I/O 대기 중 GIL 해제)

CPU 바운드에서는 멀티스레드가 오히려 약간 느립니다. GIL을 두 스레드가 번갈아 잡으면서 경합(contention)과 전환 비용이 발생하기 때문입니다. I/O 바운드에서는 time.sleep()이나 socket.recv() 같은 I/O 함수가 호출될 때 GIL이 풀리므로, 두 스레드가 동시에 I/O를 대기할 수 있어 시간이 절반으로 줄어듭니다.

CPU 바운드 병렬 처리에는 multiprocessing을 사용합니다.

multiprocessing_example.py
from multiprocessing import Pool

def cpu_bound(n):
    total = 0
    for i in range(n):
        total += i * i
    return total

with Pool(processes=4) as pool:
    results = pool.map(cpu_bound, [10_000_000] * 4)
    print(sum(results))

multiprocessing은 별도의 프로세스를 생성하므로 GIL의 영향을 받지 않습니다. 각 프로세스가 독립된 Python 인터프리터와 메모리 공간을 가집니다.

데몬 스레드

C의 pthread_detach()에 해당하는 Python의 개념이 데몬 스레드입니다. 데몬 스레드는 메인 스레드가 종료되면 자동으로 종료됩니다.

daemon_thread.py
import threading
import time

def background_monitor():
    while True:
        print("모니터링 중...")
        time.sleep(2)

t = threading.Thread(target=background_monitor, daemon=True)
t.start()

time.sleep(5)
print("메인 종료")
# 메인이 끝나면 데몬 스레드도 자동 종료

데몬 스레드는 daemon=True로 설정합니다. 메인 프로세스가 종료될 때 데몬 스레드는 정리(cleanup) 없이 즉시 종료되므로, 파일 쓰기 중간에 끊어질 수 있습니다. 클린업이 필요한 작업에는 일반 스레드 + Event 플래그로 종료 신호를 보내는 것이 안전합니다.


스레드 풀

왜 스레드 풀이 필요한가

매 요청마다 스레드를 생성하고 파괴하면 두 가지 문제가 있습니다. 첫째, 생성/파괴 비용이 누적됩니다. 스레드 하나 생성에 수십~수백 마이크로초가 걸리는데, 초당 수천 요청이면 이것만으로도 상당한 오버헤드입니다. 둘째, 동시에 만들 수 있는 스레드 수에 제한이 없으면, 트래픽 폭주 시 수만 개의 스레드가 생성되어 메모리가 고갈되고 시스템이 다운됩니다.

스레드 풀(Thread Pool)은 이 두 문제를 해결합니다. 미리 일정 수의 스레드를 생성해 두고, 작업이 오면 유휴 스레드에 할당합니다. 모든 스레드가 바쁘면 작업은 큐에서 대기합니다.

Python — ThreadPoolExecutor

thread_pool.py
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch_url(url):
    """URL 내용을 가져오는 I/O 바운드 작업 (시뮬레이션)"""
    time.sleep(1)  # 네트워크 지연 시뮬레이션
    return f"{url} 완료 ({len(url)} bytes)"

urls = [f"https://example.com/page/{i}" for i in range(8)]

with ThreadPoolExecutor(max_workers=4) as executor:
    # submit()은 Future 객체를 반환
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}

    # 완료되는 순서대로 결과 수집
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        result = future.result()
        print(f"{url}{result}")

ThreadPoolExecutor(max_workers=4)는 4개의 워커 스레드 풀을 생성합니다. 8개의 작업을 제출하면, 4개가 먼저 각 스레드에 할당되어 실행됩니다. 하나가 끝날 때마다 큐에서 다음 작업을 가져와 실행합니다. as_completed()는 완료되는 순서대로 Future를 반환하므로, 빨리 끝난 결과부터 처리할 수 있습니다.

with 블록을 벗어나면 executor.shutdown(wait=True)가 자동으로 호출되어, 모든 작업이 완료될 때까지 기다립니다.

C — 간단한 스레드 풀 구조

C에서 스레드 풀은 직접 구현해야 합니다. 핵심 구조는 작업 큐 + 조건 변수 + 뮤텍스입니다.

thread_pool_concept.c
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define POOL_SIZE 4
#define QUEUE_SIZE 16

typedef struct {
    void (*function)(void *);
    void *arg;
} task_t;

typedef struct {
    task_t queue[QUEUE_SIZE];
    int head, tail, count;
    pthread_mutex_t lock;
    pthread_cond_t not_empty;
    pthread_cond_t not_full;
    int shutdown;
    pthread_t workers[POOL_SIZE];
} thread_pool_t;

void *worker_thread(void *arg) {
    thread_pool_t *pool = (thread_pool_t *)arg;

    while (1) {
        pthread_mutex_lock(&pool->lock);

        while (pool->count == 0 && !pool->shutdown) {
            pthread_cond_wait(&pool->not_empty, &pool->lock);
        }
        if (pool->shutdown) {
            pthread_mutex_unlock(&pool->lock);
            break;
        }

        task_t task = pool->queue[pool->head];
        pool->head = (pool->head + 1) % QUEUE_SIZE;
        pool->count--;
        pthread_cond_signal(&pool->not_full);
        pthread_mutex_unlock(&pool->lock);

        task.function(task.arg);  /* 작업 실행 */
    }
    return NULL;
}

워커 스레드는 큐가 비어 있으면 pthread_cond_wait()으로 대기합니다. 작업이 큐에 추가되면 pthread_cond_signal()로 워커 하나를 깨웁니다. 뮤텍스로 큐 접근을 보호합니다. 이 패턴은 6장의 동기화에서 더 자세히 다룹니다.

풀 크기 결정 가이드라인

스레드 풀의 크기를 어떻게 정해야 할까요? 유명한 공식이 있습니다.

CPU 바운드: 스레드 수 = CPU 코어 수. 코어보다 많은 스레드를 만들어도 컨텍스트 스위칭 비용만 증가합니다.

I/O 바운드: 스레드 수 = CPU 코어 수 × (1 + 대기시간/처리시간). 예를 들어, 4코어에서 타스크의 대기/처리 비율이 9:1이라면, 4 × (1 + 9) = 40개의 스레드가 적합합니다.

실무에서는 이 공식을 시작점으로 삼고, 부하 테스트로 조정합니다. 스레드 수가 너무 적으면 CPU가 놀고, 너무 많으면 컨텍스트 스위칭과 메모리가 낭비됩니다.


스레드 안전성 미리보기

멀티스레드 코드에서 가장 빠지기 쉬운 함정을 미리 경험해 봅시다.

race_condition.py
import threading

counter = 0

def increment():
    global counter
    for _ in range(1_000_000):
        counter += 1  # 이 한 줄이 원자적이지 않음

t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start(); t2.start()
t1.join(); t2.join()

print(f"기대값: 2000000, 실제값: {counter}")

counter += 1은 Python 수준에서 한 줄이지만, 바이트코드로는 LOAD_GLOBAL counter → LOAD_CONST 1 → BINARY_ADD → STORE_GLOBAL counter의 네 단계입니다. 두 스레드가 동시에 LOAD → ADD → STORE를 수행하면, 하나의 증가가 덮어써집니다. 실행할 때마다 결과가 달라지는 경합 조건(Race Condition)입니다.

이것을 해결하려면? 6장에서 다룰 뮤텍스(Mutex)를 사용합니다.

race_condition_fixed.py
import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(1_000_000):
        with lock:
            counter += 1

t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start(); t2.start()
t1.join(); t2.join()

print(f"기대값: 2000000, 실제값: {counter}")
# 항상 2000000

with lock:이 임계 구역(Critical Section)을 보호합니다. lock.acquire() → 코드 실행 → lock.release()를 자동으로 처리합니다. 한 스레드가 락을 잡고 있는 동안 다른 스레드는 대기합니다. 이것이 6장의 핵심 주제입니다.


실무 패턴 정리

패턴사용 시점예시
스레드 직접 생성간단한 병렬 작업, 고정된 수의 작업파일 3개 동시 다운로드
데몬 스레드백그라운드 모니터링, 필수가 아닌 작업로그 수집, 헬스 체크
스레드 풀반복적인 작업, 서버의 요청 처리웹 서버, 배치 프로세서
multiprocessingCPU 바운드 (Python)이미지 변환, 과학 계산
async/awaitI/O 바운드, 높은 동시성API 서버, 크롤러

다음 장에서는 여러 스레드(또는 프로세스) 중 누구에게 CPU를 줄지 결정하는 CPU 스케줄링을 다루겠습니다. 스케줄링 알고리즘의 선택이 시스템 성능과 응답시간에 어떤 영향을 미치는지 분석합니다.

목차