icon

안동민 개발노트

6장: 힙과 우선순위 큐

우선순위 큐 설계와 안정성


우선순위 큐(Priority Queue)는 먼저 들어온 것이 아니라 더 중요한 것을 먼저 꺼내는 큐입니다. 그래서 일반 큐(FIFO)와 달리 비교 규칙을 우선 정해야 합니다. 실무에서 자주 터지는 버그도 자료구조보다 이 비교 규칙 누락에서 발생합니다.

특히 동점 처리 기준이 없으면 실행할 때마다 순서가 바뀔 수 있습니다. 아래에서는 우선순위 방향, 동점 처리, 재삽입 정책을 순서대로 정리합니다. 코드는 단순하게 유지하되 운영에서 필요한 안정성 포인트는 빠뜨리지 않습니다.

핵심 패턴 정돈: 우선순위 큐 안정성

우선순위 큐는 자료구조보다 정책을 우선 정해야 안정적으로 동작합니다. 코드를 보기 전에 동점일 때 처리 순서를 한 문장으로 고정해 보세요.

우선순위 방향은 작은 값을 우선 뽑을지, 큰 값을 우선 뽑을지 정하는 1차 기준입니다. 비교 함수(comparator)가 이 방향을 정의하며 전체 순서의 뼈대를 만듭니다. deadline이 작을수록 급한 문제라면 min-heap 방향을 선택하는 것이 자연스럽습니다.

동점 처리(tie-breaker)는 1차 우선순위가 같을 때의 2차 기준입니다. 결정성이 없으면 실행할 때마다 순서가 바뀌므로 디버깅과 재현이 어려워집니다. 삽입 순번 seq를 함께 저장하면 같은 우선순위에서도 항상 같은 처리 순서를 유지할 수 있습니다.

재삽입 정책은 작업 상태가 바뀌었을 때 큐를 어떻게 갱신할지 정한 운영 규칙입니다. 우선순위가 변한 작업을 새 엔트리로 넣고, 예전 엔트리를 무효 처리하는 방식이 흔히 쓰입니다. 예전 토큰을 pop 단계에서 무시하면, decrease-key를 직접 지원하지 않는 환경에서도 안전하게 동작합니다.

우선순위 큐 실전 장애와 연결하기

동일 우선순위 작업 순서가 매 실행마다 달라지면 재현과 장애 분석이 어려워집니다.
특히 알림/결제/재시도 큐에서는 작은 순서 차이가 큰 운영 차이를 만듭니다.

따라서 우선순위 큐는 값만 넣는 구조가 아니라 정책을 담는 구조로 설계해야 합니다.

우선순위 변경 테스트로 빠르게 검증하기

학습 효율을 높이려면 오답 -> 디버깅 -> 교정 -> 검증 순서로 로그를 남기면서 진행하는 것이 가장 빠릅니다.

동점 처리 규칙이 왜 필요한지 간단히 따라가 봅니다.

  • 작업: (prio=1, A), (prio=1, B), (prio=0, C)
  • 규칙: 우선순위 오름차순, 동점이면 먼저 들어온 순서
  1. C가 가장 먼저 처리됩니다(prio=0).
  2. A, B는 우선순위가 같으므로 삽입 순서 기준으로 처리됩니다.
  3. 동점 규칙이 없으면 실행마다 처리 순서가 달라질 수 있습니다.

우선순위 큐: 안정성을 고려한 우선순위 큐

문제 의도: 우선순위 + 순번(seq) 조합으로 안정적인 처리 순서를 보장합니다.
입력 예시: 동일 우선순위가 포함된 작업 목록
출력 예시: 재현 가능한 처리 순서

main.cpp
#include <functional>
#include <iostream>
#include <queue>
#include <string>
#include <vector>
using namespace std;

struct Job {
    int priority;
    int seq;
    string name;
    bool operator>(const Job& other) const {
        if (priority != other.priority) return priority > other.priority;
        return seq > other.seq;
    }
};

vector<Job> schedule(const vector<pair<int, string>>& items) {
    priority_queue<Job, vector<Job>, greater<Job>> pq;
    int seq = 0;
    for (const auto& item : items) {
        int priority = item.first;
        const string& name = item.second;
        pq.push({priority, seq, name});
        seq++;
    }
    vector<Job> out;
    while (!pq.empty()) {
        out.push_back(pq.top());
        pq.pop();
    }
    return out;
}

int main() {
    vector<pair<int, string>> items = {{1, "sync"}, {0, "alarm"}, {1, "retry"}, {0, "notify"}};
    auto out = schedule(items);
    for (const auto& j : out) {
        cout << "(" << j.priority << ", " << j.seq << ", " << j.name << ") ";
    }
    cout << "\n";
    return 0;
}
Main.java
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;

public class Main {
    static class Job implements Comparable<Job> {
        int priority, seq;
        String name;
        Job(int priority, int seq, String name) {
            this.priority = priority;
            this.seq = seq;
            this.name = name;
        }
        @Override
        public int compareTo(Job o) {
            if (priority != o.priority) return Integer.compare(priority, o.priority);
            return Integer.compare(seq, o.seq);
        }
        @Override
        public String toString() {
            return "(" + priority + ", " + seq + ", " + name + ")";
        }
    }

    static List<Job> schedule(int[][] priorities, String[] names) {
        PriorityQueue<Job> pq = new PriorityQueue<>();
        for (int i = 0; i < priorities.length; i++) {
            pq.offer(new Job(priorities[i][0], i, names[i]));
        }
        List<Job> out = new ArrayList<>();
        while (!pq.isEmpty()) out.add(pq.poll());
        return out;
    }

    public static void main(String[] args) {
        int[][] p = {{1}, {0}, {1}, {0}};
        String[] n = {"sync", "alarm", "retry", "notify"};
        System.out.println(schedule(p, n));
    }
}
main.py
import heapq
from dataclasses import dataclass, field

@dataclass(order=True)
class Job:
    priority: int
    seq: int
    name: str = field(compare=False)

def schedule(items):
    pq = []
    seq = 0

    for priority, name in items:
        heapq.heappush(pq, Job(priority, seq, name))
        seq += 1

    out = []
    while pq:
        job = heapq.heappop(pq)
        out.append((job.priority, job.seq, job.name))

    return out

items = [(1, "sync"), (0, "alarm"), (1, "retry"), (0, "notify")]
print(schedule(items))
# [(0, 1, 'alarm'), (0, 3, 'notify'), (1, 0, 'sync'), (1, 2, 'retry')]
main.rs
use std::cmp::Reverse;
use std::collections::BinaryHeap;

fn schedule(items: &[(i32, &str)]) -> Vec<(i32, i32, String)> {
    let mut pq = BinaryHeap::<Reverse<(i32, i32, String)>>::new();
    let mut seq = 0;

    for &(priority, name) in items {
        pq.push(Reverse((priority, seq, name.to_string())));
        seq += 1;
    }

    let mut out = Vec::new();
    while let Some(Reverse(item)) = pq.pop() {
        out.push(item);
    }
    out
}

fn main() {
    let items = [(1, "sync"), (0, "alarm"), (1, "retry"), (0, "notify")];
    println!("{:?}", schedule(&items));
}

우선순위 큐 설계 검토 힙 정렬성 확인

우선순위와 입력 순번을 함께 비교 기준에 포함하면 안정성이 확보됩니다.
compare=False를 사용해 이름 필드가 정렬 기준을 오염시키지 않도록 했습니다.
이 방식은 운영 재현성과 테스트 안정성에 큰 도움이 됩니다.

  • 평균 시간복잡도: 삽입/삭제 O(log N)
  • 최악 시간복잡도: 삽입/삭제 O(log N)
  • 공간복잡도: O(N)

우선순위 큐: 동적 우선순위 변경(재삽입 방식)

문제 의도: 우선순위 변경을 in-place 수정 대신 재삽입으로 처리하는 패턴을 보여줍니다.
입력 예시: 작업 report의 우선순위 재조정
출력 예시: 우선순위 변경 후 처리 순서

main.cpp
#include <iostream>
#include <queue>
#include <string>
#include <vector>
using namespace std;

struct Item {
    int priority;
    int seq;
    string task;
};

struct Cmp {
    bool operator()(const Item& a, const Item& b) const {
        if (a.priority != b.priority) return a.priority > b.priority;
        return a.seq > b.seq;
    }
};

vector<Item> reprioritizeDemo() {
    priority_queue<Item, vector<Item>, Cmp> pq;
    int counter = 0;
    pq.push({3, counter, "backup"});
    counter++;
    pq.push({1, counter, "alert"});
    counter++;
    pq.push({2, counter, "report"});
    counter++;
    pq.push({0, counter, "report"});
    counter++;

    vector<Item> out;
    while (!pq.empty()) {
        out.push_back(pq.top());
        pq.pop();
    }
    return out;
}

int main() {
    vector<Item> out = reprioritizeDemo();
    for (const Item& item : out) {
        cout << "(" << item.priority << ", " << item.seq << ", " << item.task << ") ";
    }
    cout << "\n";
    return 0;
}
Main.java
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;

public class Main {
    static class Item implements Comparable<Item> {
        int priority, seq;
        String task;
        Item(int priority, int seq, String task) {
            this.priority = priority;
            this.seq = seq;
            this.task = task;
        }
        @Override
        public int compareTo(Item o) {
            if (priority != o.priority) return Integer.compare(priority, o.priority);
            return Integer.compare(seq, o.seq);
        }
        @Override
        public String toString() {
            return "(" + priority + ", " + seq + ", " + task + ")";
        }
    }

    static List<Item> reprioritizeDemo() {
        PriorityQueue<Item> pq = new PriorityQueue<>();
        int counter = 0;
        pq.offer(new Item(3, counter++, "backup"));
        pq.offer(new Item(1, counter++, "alert"));
        pq.offer(new Item(2, counter++, "report"));
        pq.offer(new Item(0, counter++, "report"));
        List<Item> out = new ArrayList<>();
        while (!pq.isEmpty()) out.add(pq.poll());
        return out;
    }

    public static void main(String[] args) {
        System.out.println(reprioritizeDemo());
    }
}
main.py
import heapq

def reprioritize_demo():
    pq = []
    counter = 0

    heapq.heappush(pq, (3, counter, "backup"))
    counter += 1
    heapq.heappush(pq, (1, counter, "alert"))
    counter += 1
    heapq.heappush(pq, (2, counter, "report"))
    counter += 1
    # report의 우선순위를 높이고 싶으면 새 항목으로 재삽입
    heapq.heappush(pq, (0, counter, "report"))
    counter += 1

    out = []
    while pq:
        out.append(heapq.heappop(pq))
    return out

print(reprioritize_demo())
main.rs
use std::cmp::Reverse;
use std::collections::BinaryHeap;

fn reprioritize_demo() -> Vec<(i32, i32, String)> {
    let mut pq = BinaryHeap::<Reverse<(i32, i32, String)>>::new();
    let mut counter = 0;

    pq.push(Reverse((3, counter, "backup".to_string())));
    counter += 1;
    pq.push(Reverse((1, counter, "alert".to_string())));
    counter += 1;
    pq.push(Reverse((2, counter, "report".to_string())));
    counter += 1;
    pq.push(Reverse((0, counter, "report".to_string())));
    counter += 1;

    let mut out = Vec::new();
    while let Some(Reverse(item)) = pq.pop() {
        out.push(item);
    }
    out
}

fn main() {
    println!("{:?}", reprioritize_demo());
}

우선순위 큐 설계 검토 우선순위 흐름 추적

힙 내부에서 임의 원소 우선순위를 직접 수정하는 것은 비용이 큽니다.
실무에서는 보통 새 우선순위로 재삽입 + 구 버전 무시 전략을 사용합니다.
이때 stale 항목 제거 정책을 함께 설계해야 합니다.

  • 시간복잡도: M개 엔트리 처리 기준 O(M log M)
  • 공간복잡도: O(M)

우선순위 큐: 우선순위 큐 + 유효성 토큰

문제 의도: stale 항목을 토큰으로 무시해 동적 변경 시 일관성을 유지합니다.
입력 예시: 같은 task_id에 대해 여러 번 우선순위 갱신
출력 예시: 최신 토큰 기준 유효 작업만 처리

main.cpp
#include <iostream>
#include <queue>
#include <string>
#include <unordered_map>
#include <vector>
using namespace std;

struct TokenItem {
    int prio;
    int seq;
    string taskId;
    int token;
};

struct TokenCmp {
    bool operator()(const TokenItem& a, const TokenItem& b) const {
        if (a.prio != b.prio) return a.prio > b.prio;
        return a.seq > b.seq;
    }
};

vector<pair<string, int>> processWithToken(const vector<pair<string, int>>& tasks) {
    priority_queue<TokenItem, vector<TokenItem>, TokenCmp> pq;
    unordered_map<string, int> valid;
    int seq = 0;

    for (const auto& task : tasks) {
        string taskId = task.first;
        int prio = task.second;
        int token = seq;
        valid[taskId] = token;
        TokenItem item = {prio, seq, taskId, token};
        pq.push(item);
        seq++;
    }

    vector<pair<string, int>> done;
    while (!pq.empty()) {
        TokenItem cur = pq.top();
        pq.pop();
        if (valid[cur.taskId] != cur.token) continue;
        done.push_back({cur.taskId, cur.prio});
    }
    return done;
}

int main() {
    vector<pair<string, int>> out = processWithToken({{"A", 2}, {"B", 1}, {"A", 0}});
    for (const auto& item : out) {
        cout << "(" << item.first << ", " << item.second << ") ";
    }
    cout << "\n";
    return 0;
}
Main.java
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;

public class Main {
    static class Item implements Comparable<Item> {
        int prio, seq, token;
        String taskId;
        Item(int prio, int seq, String taskId, int token) {
            this.prio = prio;
            this.seq = seq;
            this.taskId = taskId;
            this.token = token;
        }
        @Override
        public int compareTo(Item o) {
            if (prio != o.prio) return Integer.compare(prio, o.prio);
            return Integer.compare(seq, o.seq);
        }
    }

    static List<String> processWithToken(String[] ids, int[] prios) {
        PriorityQueue<Item> pq = new PriorityQueue<>();
        Map<String, Integer> valid = new HashMap<>();
        int seq = 0;
        for (int i = 0; i < ids.length; i++) {
            int token = seq;
            valid.put(ids[i], token);
            pq.offer(new Item(prios[i], seq, ids[i], token));
            seq++;
        }

        List<String> out = new ArrayList<>();
        while (!pq.isEmpty()) {
            Item cur = pq.poll();
            if (!valid.get(cur.taskId).equals(cur.token)) continue;
            out.add("(" + cur.taskId + ", " + cur.prio + ")");
        }
        return out;
    }

    public static void main(String[] args) {
        System.out.println(processWithToken(new String[]{"A", "B", "A"}, new int[]{2, 1, 0}));
    }
}
main.py
import heapq

def process_with_token(tasks):
    pq = []
    valid = {}
    seq = 0

    for task_id, prio in tasks:
        token = seq
        valid[task_id] = token
        heapq.heappush(pq, (prio, seq, task_id, token))
        seq += 1

    done = []
    while pq:
        prio, _, task_id, token = heapq.heappop(pq)
        if valid.get(task_id) != token:
            continue  # stale 항목 무시
        done.append((task_id, prio))
    return done

print(process_with_token([("A", 2), ("B", 1), ("A", 0)]))
main.rs
use std::cmp::Reverse;
use std::collections::{BinaryHeap, HashMap};

fn process_with_token(tasks: &[(&str, i32)]) -> Vec<(String, i32)> {
    let mut pq = BinaryHeap::<Reverse<(i32, i32, String, i32)>>::new();
    let mut valid = HashMap::<String, i32>::new();
    let mut seq = 0;

    for &(task_id, prio) in tasks {
        let token = seq;
        valid.insert(task_id.to_string(), token);
        pq.push(Reverse((prio, seq, task_id.to_string(), token)));
        seq += 1;
    }

    let mut done = Vec::new();
    while let Some(Reverse((prio, _, task_id, token))) = pq.pop() {
        if valid.get(&task_id) != Some(&token) {
            continue;
        }
        done.push((task_id, prio));
    }
    done
}

fn main() {
    println!("{:?}", process_with_token(&[("A", 2), ("B", 1), ("A", 0)]));
}

우선순위 큐 설계 검토 push/pop 재확인

토큰 방식은 우선순위 재삽입 시 stale 항목을 안전하게 무시하는 패턴입니다.
재시도 큐, 지연 큐 등 동적 정책 환경에서 유용합니다.
정책이 복잡할수록 데이터 구조보다 무효화 규칙이 중요해집니다.

  • 시간복잡도: M개 엔트리 처리 기준 O(M log M)
  • 공간복잡도: O(M)

우선순위 큐 정책 운용 관점

우선순위 큐를 설계할 때 확인할 질문입니다.

  • 동점 순서를 보장해야 하는가
  • 우선순위가 런타임에 바뀌는가
  • stale 항목이 발생할 수 있는가
  • 처리 지연과 공정성 중 무엇이 더 중요한가

이 질문에 답하면 비교 키와 큐 정책이 명확해집니다.

안정성 보장과 처리량 균형점

실무에서 자주 맞닥뜨리는 균형점입니다.

  • 엄격 우선순위: 지연 최소화 가능, 기아(starvation) 위험
  • 공정성 강화: 기아 완화, 평균 지연 증가 가능
  • 단순 힙: 구현 간결, 동적 정책 지원 제한
  • 토큰/버전 관리: 안정성 향상, 구현 복잡도 증가

성능, 공정성, 재현성 중 우선순위를 명확히 정해야 합니다.

우선순위 역전 오답 패턴과 교정 방법

우선순위 큐에서 흔한 오류입니다.

  • 우선순위 방향 반대로 구현
  • 동점 처리 규칙 누락
  • stale 항목 제거 없이 중복 처리 발생
  • mutable 객체의 키를 큐 삽입 후 변경

테스트는 동점 다수, 재삽입, stale 항목, 빈 큐 처리 케이스를 포함해야 합니다.

작업 분포와 복잡도 판단

같은 정답이라도 입력 분포에 따라 병목 지점이 달라지므로, 선택 관점을 시간복잡도 표 하나로 끝내지 말고 분포 가정까지 명시해야 합니다.

연산평균 시간최악 시간공간
삽입O(log N)O(log N)O(N)
최고 우선순위 추출O(log N)O(log N)O(N)
top 조회O(1)O(1)O(N)

동적 우선순위 변경은 재삽입 전략 기준으로 추가 비용을 고려해야 합니다.

우선순위 역전 문제분석 체크 포인트

  • 우선순위 단위(작을수록 높음/클수록 높음)를 코드와 문서에서 일치시켜야 합니다.
  • 큐 정책이 바뀌면 데이터 구조보다 비교 키 설계를 우선 수정해야 합니다.
  • 운영 로그에는 task_id, priority, seq를 함께 남겨 재현성을 확보합니다.
  • 멀티워커 환경에서는 분산 큐 일관성 정책까지 함께 설계해야 합니다.

운영 적용 요약

우선순위 큐를 안정적으로 운영하려면 비교 규칙, 동점 정책, stale 처리 규칙을 세트로 고정해야 합니다.
세 규칙 중 하나라도 문서와 코드가 어긋나면 재현 불가능한 순서 버그가 발생하기 쉽습니다.
정책을 코드 상수와 테스트 케이스로 동시에 관리하면 유지보수 비용을 크게 줄일 수 있습니다. 특히 재시도 큐가 있는 시스템에서는 이 기준이 장애 전파를 막는 핵심 안전장치가 됩니다. 운영 중 정책을 바꿔야 할 때도 영향 범위를 빠르게 추적할 수 있습니다.

제출 전 큐 안정성 검증 체크리스트

  • 우선순위 방향(작을수록/클수록 높음)을 명시했는가
  • 동점 처리 기준(seq, timestamp 등)을 코드에 반영했는가
  • 우선순위 갱신 시 stale 항목 무시 전략을 적용했는가
  • 재현 가능한 순서 검증 테스트를 자동화했는가

우선순위 큐 안정성 정리

우선순위 큐의 성능은 O(log N)으로 요약되지만, 품질은 비교 규칙과 정책 설계에서 결정됩니다.
안정성과 동적 변경 전략을 함께 설계하면 운영 환경에서도 예측 가능한 처리 순서를 유지할 수 있습니다.

목차