상위 K개, 스트림 처리와 힙 응용
Top-K 문제는 전체 중 상위 K개만 필요할 때 등장합니다. 데이터가 계속 들어오는 스트림 환경에서는 매번 전체 정렬을 하면 느리고 메모리도 많이 씁니다. 그래서 크기 K의 최소 힙을 두고, 경계값보다 큰 값만 남기는 방식으로 풉니다.
여기서 중요한 질문은 무엇을 더 저장할까?보다 무엇을 버려도 안전한가?입니다.
여기서는 규칙을 단순화해 따라오기 쉽게 구성하고,
코테 실전에서 쓰는 O(N log K) 접근은 그대로 유지합니다.
핵심 원리 정돈: 상위 K·스트림 처리
Top-K는 어떤 값을 남길지보다 어떤 값을 버려도 되는지를 우선 정하면 쉬워집니다. 지금 문제에서 버려도 안전한 조건을 한 줄로 써 놓고 아래 개념을 읽어 보세요.
경계값(boundary)은 현재 Top-K에 들어오기 위해 넘어야 하는 최소 기준입니다.
최소 힙 방식에서는 힙 루트가 곧 경계값이고, 현재 선택된 K개 중 최솟값을 뜻합니다.
Top-3가 [9,7,5]라면 경계값은 5이고, 새 값이 5 이하이면 유지해도 결과가 바뀌지 않습니다.
크기 K 최소 힙은 상위 K개만 유지하는 필터 역할을 합니다.
힙 크기를 K로 고정하면 입력 길이가 커져도 상태 공간이 O(K)를 넘지 않습니다.
입력이 1만 개여도 힙 안에는 최대 K개만 남기기 때문에 메모리 관리가 쉬워집니다.
교체 규칙은 힙이 꽉 찼을 때 어떤 값을 교체할지 정한 정책입니다.
일반적으로 x > heap[0]일 때만 pop+push를 수행해 경계값을 갱신합니다.
경계값이 5일 때 새 값 8이 들어오면 5를 제거하고 8을 넣어 Top-K를 유지합니다.
Top-K 처리의 운영 영향
스트림 시스템에서는 정렬을 매번 수행할 수 없습니다.
메모리 제한과 지연 시간 제한이 동시에 존재하기 때문입니다.
힙 방식은 다음 장점이 있습니다.
- 메모리 사용량
O(K)유지 - 입력 길이와 무관한 상태 크기
- 실시간 갱신 가능
스트림 Top-K 디버깅 시나리오
여기서는 오답 -> 디버깅 -> 교정 -> 검증 흐름으로 추적해, 어디서 불변식이 깨지는지 단계별로 확인합니다.
Top-K 스트림을 최소 힙으로 유지하는 과정을 짧게 따라가 봅니다.
- 입력 스트림:
5, 1, 9, 3, 7 K=3
- 처음 3개(
5,1,9)는 그대로 힙에 넣습니다. - 다음 값
3은 힙 최소값보다 크면 교체, 작으면 버립니다. - 마지막 값
7도 같은 규칙으로 처리해 항상 상위 3개만 유지합니다.
Top-K: 기본 Top-K 스트림
문제 의도: 스트림 입력에서 최소 힙으로 Top-K를 유지하는 기본 패턴을 학습합니다.
입력 예시: values=[5,1,9,3,7], k=3
출력 예시: 상위 3개 값(정렬 출력 기준 [9,7,5])
힙의 맨 위 값은 현재 Top-K 경계(가장 작은 후보)이므로 교체 여부 판단에 바로 사용합니다.
#include <algorithm>
#include <functional>
#include <iostream>
#include <queue>
#include <vector>
using namespace std;
vector<int> topKStream(const vector<int>& values, int k) {
if (k <= 0) return {};
priority_queue<int, vector<int>, greater<int>> h;
for (int x : values) {
if ((int)h.size() < k) {
h.push(x);
} else if (x > h.top()) {
h.pop();
h.push(x);
}
}
vector<int> out;
while (!h.empty()) {
out.push_back(h.top());
h.pop();
}
sort(out.rbegin(), out.rend());
return out;
}
int main() {
vector<int> ans = topKStream({7, 2, 9, 4, 11, 3, 10, 5, 8}, 3);
for (int x : ans) cout << x << " ";
cout << "\n";
return 0;
}import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
public class Main {
static List<Integer> topKStream(int[] values, int k) {
if (k <= 0) return new ArrayList<>();
PriorityQueue<Integer> h = new PriorityQueue<>();
for (int x : values) {
if (h.size() < k) {
h.offer(x);
} else if (x > h.peek()) {
h.poll();
h.offer(x);
}
}
List<Integer> out = new ArrayList<>();
while (!h.isEmpty()) {
out.add(h.poll());
}
int left = 0;
int right = out.size() - 1;
while (left < right) {
int tmp = out.get(left);
out.set(left, out.get(right));
out.set(right, tmp);
left++;
right--;
}
return out;
}
public static void main(String[] args) {
System.out.println(topKStream(new int[]{7, 2, 9, 4, 11, 3, 10, 5, 8}, 3));
}
}import heapq
def top_k_stream(values, k):
if k <= 0:
return []
heap = []
for x in values:
if len(heap) < k:
heapq.heappush(heap, x)
else:
if x > heap[0]:
heapq.heapreplace(heap, x)
out = []
while heap:
out.append(heapq.heappop(heap))
out.reverse()
return out
print(top_k_stream([7, 2, 9, 4, 11, 3, 10, 5, 8], 3))
# [11, 10, 9]use std::cmp::Reverse;
use std::collections::BinaryHeap;
fn top_k_stream(values: &[i32], k: usize) -> Vec<i32> {
if k == 0 {
return vec![];
}
let mut h = BinaryHeap::<Reverse<i32>>::new();
for &x in values {
if h.len() < k {
h.push(Reverse(x));
} else if let Some(&Reverse(min_v)) = h.peek() {
if x > min_v {
h.pop();
h.push(Reverse(x));
}
}
}
let mut out = Vec::new();
while let Some(Reverse(x)) = h.pop() {
out.push(x);
}
out.reverse();
out
}
fn main() {
println!("{:?}", top_k_stream(&[7, 2, 9, 4, 11, 3, 10, 5, 8], 3));
}상위 K 힙 응용 검토 힙 정렬성 확인
힙 루트가 경계값 역할을 하므로 작은 값은 즉시 버릴 수 있습니다.
전체 정렬 없이 상위 K개만 유지하므로 스트림 처리에 적합합니다.
k == 0 같은 경계값 처리도 초기에 고정해야 안정적입니다.
- 평균 시간복잡도:
O(N log K) - 최악 시간복잡도:
O(N log K) - 공간복잡도:
O(K)
Top-K: (점수, 사용자) Top-K
문제 의도: 단일 값이 아닌 (점수, 사용자) 레코드의 Top-K 유지 규칙을 확인합니다.
입력 예시: (score,user) 스트림, k=3
출력 예시: 점수 기준 상위 사용자 3명
레코드 문제도 핵심은 같고, 비교 기준이 되는 점수만 일관되게 꺼내서 힙에 넣으면 됩니다.
#include <algorithm>
#include <functional>
#include <iostream>
#include <queue>
#include <string>
#include <utility>
#include <vector>
using namespace std;
vector<pair<string, int>> topKUsers(const vector<pair<string, int>>& records, int k) {
priority_queue<pair<int, string>, vector<pair<int, string>>, greater<pair<int, string>>> h;
for (const auto& rec : records) {
string user = rec.first;
int score = rec.second;
pair<int, string> item = {score, user};
if ((int)h.size() < k) {
h.push(item);
} else if (score > h.top().first) {
h.pop();
h.push(item);
}
}
vector<pair<string, int>> out;
while (!h.empty()) {
int score = h.top().first;
string user = h.top().second;
h.pop();
out.push_back({user, score});
}
sort(out.begin(), out.end(), [](const pair<string, int>& a, const pair<string, int>& b) {
return a.second > b.second;
});
return out;
}
int main() {
vector<pair<string, int>> records = {{"a", 70}, {"b", 85}, {"c", 90}, {"d", 60}, {"e", 88}};
vector<pair<string, int>> ans = topKUsers(records, 2);
for (const auto& item : ans) cout << "(" << item.first << ", " << item.second << ") ";
cout << "\n";
return 0;
}import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
public class Main {
static class UserScore {
String user;
int score;
UserScore(String user, int score) {
this.user = user;
this.score = score;
}
@Override
public String toString() {
return "(" + user + ", " + score + ")";
}
}
static List<UserScore> topKUsers(List<UserScore> records, int k) {
PriorityQueue<UserScore> h = new PriorityQueue<>(
new Comparator<UserScore>() {
@Override
public int compare(UserScore a, UserScore b) {
return Integer.compare(a.score, b.score);
}
}
);
for (UserScore item : records) {
if (h.size() < k) h.offer(item);
else if (item.score > h.peek().score) {
h.poll();
h.offer(item);
}
}
List<UserScore> out = new ArrayList<>(h);
out.sort(
new Comparator<UserScore>() {
@Override
public int compare(UserScore a, UserScore b) {
return Integer.compare(b.score, a.score);
}
}
);
return out;
}
public static void main(String[] args) {
List<UserScore> records = List.of(
new UserScore("a", 70), new UserScore("b", 85), new UserScore("c", 90),
new UserScore("d", 60), new UserScore("e", 88)
);
System.out.println(topKUsers(records, 2));
}
}import heapq
def top_k_users(records, k):
# records: (user, score)
heap = [] # (score, user)
for user, score in records:
item = (score, user)
if len(heap) < k:
heapq.heappush(heap, item)
else:
if score > heap[0][0]:
heapq.heapreplace(heap, item)
out = []
while heap:
score, user = heapq.heappop(heap)
out.append((user, score))
out.reverse()
return out
records = [("a", 70), ("b", 85), ("c", 90), ("d", 60), ("e", 88)]
print(top_k_users(records, 2)) # [('c', 90), ('e', 88)]use std::cmp::Reverse;
use std::collections::BinaryHeap;
fn top_k_users(records: &[(&str, i32)], k: usize) -> Vec<(String, i32)> {
let mut h: BinaryHeap<Reverse<(i32, String)>> = BinaryHeap::new();
for &(user, score) in records {
let item = Reverse((score, user.to_string()));
if h.len() < k {
h.push(item);
} else if let Some(Reverse((min_score, _))) = h.peek() {
if score > *min_score {
h.pop();
h.push(item);
}
}
}
let mut out = Vec::new();
while let Some(Reverse((score, user))) = h.pop() {
out.push((user, score));
}
out.reverse();
out
}
fn main() {
let records = [("a", 70), ("b", 85), ("c", 90), ("d", 60), ("e", 88)];
println!("{:?}", top_k_users(&records, 2));
}상위 K 힙 응용 검토 우선순위 흐름 추적
실무에서는 값 하나가 아니라 레코드 단위로 Top-K를 구합니다.
이때 비교 키를 명확히 분리하지 않으면 동점/정렬 기준이 흔들릴 수 있습니다.
튜플 우선순위를 이해하고 명시적 키 정책을 문서화해야 합니다.
- 시간복잡도:
O(N log K) - 공간복잡도:
O(K)
Top-K: 윈도우별 Top-K 스냅샷
문제 의도: 전체 Top-K가 아닌 구간(윈도우)별 Top-K를 분리 계산합니다.
입력 예시: values=[...], k=2, window=4
출력 예시: 윈도우별 상위 2개 결과 리스트
윈도우마다 힙을 새로 만들면 각 구간의 상위 값만 독립적으로 계산할 수 있습니다.
#include <algorithm>
#include <functional>
#include <iostream>
#include <queue>
#include <vector>
using namespace std;
vector<vector<int>> topKEachWindow(const vector<int>& values, int k, int w) {
vector<vector<int>> out;
for (int i = 0; i < (int)values.size(); i += w) {
priority_queue<int, vector<int>, greater<int>> h;
for (int j = i; j < min(i + w, (int)values.size()); ++j) {
int x = values[j];
if ((int)h.size() < k) h.push(x);
else if (x > h.top()) {
h.pop();
h.push(x);
}
}
vector<int> chunk;
while (!h.empty()) {
chunk.push_back(h.top());
h.pop();
}
sort(chunk.rbegin(), chunk.rend());
out.push_back(chunk);
}
return out;
}
int main() {
auto ans = topKEachWindow({5, 1, 9, 3, 7, 2, 8, 6, 4}, 2, 3);
for (auto& row : ans) {
cout << "[";
for (int i = 0; i < (int)row.size(); ++i) {
cout << row[i] << (i + 1 == (int)row.size() ? "" : ", ");
}
cout << "] ";
}
cout << "\n";
return 0;
}import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
public class Main {
static List<List<Integer>> topKEachWindow(int[] values, int k, int w) {
List<List<Integer>> out = new ArrayList<>();
for (int i = 0; i < values.length; i += w) {
PriorityQueue<Integer> h = new PriorityQueue<>();
for (int j = i; j < Math.min(i + w, values.length); j++) {
int x = values[j];
if (h.size() < k) h.offer(x);
else if (x > h.peek()) {
h.poll();
h.offer(x);
}
}
List<Integer> row = new ArrayList<>();
while (!h.isEmpty()) {
row.add(h.poll());
}
int left = 0;
int right = row.size() - 1;
while (left < right) {
int temp = row.get(left);
row.set(left, row.get(right));
row.set(right, temp);
left++;
right--;
}
out.add(row);
}
return out;
}
public static void main(String[] args) {
System.out.println(topKEachWindow(new int[]{5,1,9,3,7,2,8,6,4}, 2, 3));
}
}import heapq
def top_k_each_window(values, k, w):
out = []
for i in range(0, len(values), w):
chunk = values[i:i + w]
h = []
for x in chunk:
if len(h) < k:
heapq.heappush(h, x)
elif x > h[0]:
heapq.heapreplace(h, x)
chunk_out = []
while h:
chunk_out.append(heapq.heappop(h))
chunk_out.reverse()
out.append(chunk_out)
return out
print(top_k_each_window([5,1,9,3,7,2,8,6,4], 2, 3))
# [[9, 5], [7, 3], [8, 6]]use std::cmp::Reverse;
use std::collections::BinaryHeap;
fn top_k_each_window(values: &[i32], k: usize, w: usize) -> Vec<Vec<i32>> {
let mut out = Vec::new();
let mut i = 0usize;
while i < values.len() {
let mut h = BinaryHeap::<Reverse<i32>>::new();
for &x in &values[i..usize::min(i + w, values.len())] {
if h.len() < k {
h.push(Reverse(x));
} else if let Some(&Reverse(min_v)) = h.peek() {
if x > min_v {
h.pop();
h.push(Reverse(x));
}
}
}
let mut row = Vec::new();
while let Some(Reverse(v)) = h.pop() {
row.push(v);
}
row.reverse();
out.push(row);
i += w;
}
out
}
fn main() {
println!("{:?}", top_k_each_window(&[5, 1, 9, 3, 7, 2, 8, 6, 4], 2, 3));
}상위 K 힙 응용 검토 push/pop 재확인
윈도우 단위 스냅샷은 로그 분석/모니터링에서 자주 사용됩니다.
전역 Top-K와 달리 구간별 특성을 볼 수 있어 이상 징후 탐지에 유리합니다.
요구사항이 전역인지 구간별인지 우선 구분해야 설계가 맞습니다.
- 시간복잡도:
O(N log K)(윈도우별 처리 합산) - 공간복잡도:
O(K)
스트림 Top-K 운용 비교 관점
힙 기반 Top-K를 선택할 신호입니다.
- 데이터가 스트림으로 계속 들어온다
- 상위 K만 필요하고 전체 정렬은 불필요하다
- 메모리 상한이 명확하다
- 실시간 갱신이 필요하다
이 조건을 만족하면 힙이 가장 단순하고 안정적인 선택입니다.
메모리 제한 하 Top-K 성능 균형점
Top-K 구현에서 자주 맞닥뜨리는 균형점입니다.
- 힙 방식: 메모리 효율적, 최종 결과 정렬 추가 필요
- 전체 정렬: 구현 단순, 대규모 데이터에서 비용 큼
- 정확 Top-K: 비용 큼
- 근사 Top-K: 비용 감소, 정확도 손실
서비스 요구가 정확도 중심인지 지연 중심인지에 따라 선택이 달라집니다.
실전 확장: 다중 키 Top-K
실무에서는 값 하나가 아니라 여러 지표를 섞어 Top-K를 뽑아야 할 때가 많습니다.
처음 구현할 때는 우선 복합 점수 -> 단일 숫자로 바꾼 뒤 힙에 넣는 방식이 안전합니다.
아래 예제는 score = revenue * 8 + clicks * 2 규칙으로 다중 키를 단일 점수로 바꿉니다.
복잡한 다중 기준은 먼저 하나의 숫자 점수로 변환하면 구현이 크게 단순해집니다.
#include <algorithm>
#include <functional>
#include <iostream>
#include <queue>
#include <string>
#include <vector>
using namespace std;
struct Row {
string id;
int revenue;
int clicks;
};
int calcScore(int revenue, int clicks) {
return revenue * 8 + clicks * 2;
}
vector<pair<string, int>> topKMulti(const vector<Row>& records, int k) {
priority_queue<pair<int, string>, vector<pair<int, string>>, greater<pair<int, string>>> h;
for (const auto& row : records) {
int score = calcScore(row.revenue, row.clicks);
string id = row.id;
pair<int, string> item = {score, id};
if ((int)h.size() < k) h.push(item);
else if (score > h.top().first) {
h.pop();
h.push(item);
}
}
vector<pair<string, int>> out;
while (!h.empty()) {
int score = h.top().first;
string id = h.top().second;
h.pop();
out.push_back({id, score});
}
sort(out.begin(), out.end(), [](const pair<string, int>& a, const pair<string, int>& b) {
return a.second > b.second;
});
return out;
}
int main() {
vector<Row> data = {
{"A", 100, 50}, {"B", 90, 80}, {"C", 120, 20}, {"D", 60, 120}
};
vector<pair<string, int>> out = topKMulti(data, 2);
for (const auto& item : out) {
cout << "(" << item.first << ", " << item.second << ") ";
}
cout << "\n";
return 0;
}import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
public class Main {
static class Row {
String id;
int revenue;
int clicks;
Row(String id, int revenue, int clicks) {
this.id = id;
this.revenue = revenue;
this.clicks = clicks;
}
}
static class Item {
String id;
int score;
Item(String id, int score) {
this.id = id;
this.score = score;
}
@Override
public String toString() {
return "(" + id + ", " + score + ")";
}
}
static int calcScore(int revenue, int clicks) {
return revenue * 8 + clicks * 2;
}
static List<Item> topKMulti(List<Row> records, int k) {
PriorityQueue<Item> h = new PriorityQueue<>(
new Comparator<Item>() {
@Override
public int compare(Item a, Item b) {
return Integer.compare(a.score, b.score);
}
}
);
for (Row row : records) {
int score = calcScore(row.revenue, row.clicks);
Item item = new Item(row.id, score);
if (h.size() < k) h.offer(item);
else if (score > h.peek().score) {
h.poll();
h.offer(item);
}
}
List<Item> out = new ArrayList<>();
while (!h.isEmpty()) {
out.add(h.poll());
}
out.sort(
new Comparator<Item>() {
@Override
public int compare(Item a, Item b) {
return Integer.compare(b.score, a.score);
}
}
);
return out;
}
public static void main(String[] args) {
List<Row> data = List.of(
new Row("A", 100, 50), new Row("B", 90, 80),
new Row("C", 120, 20), new Row("D", 60, 120)
);
System.out.println(topKMulti(data, 2));
}
}import heapq
def calc_score(revenue, clicks):
return revenue * 8 + clicks * 2
def top_k_multi(records, k):
# records: (id, revenue, clicks)
heap = [] # (score, id)
for rid, rev, clk in records:
score = calc_score(rev, clk)
item = (score, rid)
if len(heap) < k:
heapq.heappush(heap, item)
elif score > heap[0][0]:
heapq.heapreplace(heap, item)
out = []
while heap:
score, rid = heapq.heappop(heap)
out.append((rid, score))
out.reverse()
return out
data = [("A", 100, 50), ("B", 90, 80), ("C", 120, 20), ("D", 60, 120)]
print(top_k_multi(data, 2))use std::cmp::Reverse;
use std::collections::BinaryHeap;
fn calc_score(revenue: i32, clicks: i32) -> i32 {
revenue * 8 + clicks * 2
}
fn top_k_multi(records: &[(&str, i32, i32)], k: usize) -> Vec<(String, i32)> {
let mut h: BinaryHeap<Reverse<(i32, String)>> = BinaryHeap::new();
for &(id, revenue, clicks) in records {
let score = calc_score(revenue, clicks);
let item = Reverse((score, id.to_string()));
if h.len() < k {
h.push(item);
} else if let Some(Reverse((min_score, _))) = h.peek() {
if score > *min_score {
h.pop();
h.push(item);
}
}
}
let mut out = Vec::new();
while let Some(Reverse((score, id))) = h.pop() {
out.push((id, score));
}
out.reverse();
out
}
fn main() {
let data = [("A", 100, 50), ("B", 90, 80), ("C", 120, 20), ("D", 60, 120)];
println!("{:?}", top_k_multi(&data, 2));
}이처럼 점수 계산 함수를 분리하면, 정책이 바뀌어도 힙 로직은 거의 건드리지 않아도 됩니다. 처음에는 점수 계산과 Top-K 유지를 분리해서 구현하는 습관이 안전합니다.
스트림 Top-K 운영 검토 체크리스트
스트림 Top-K 운영에서는 다음을 함께 점검해야 합니다.
- 윈도우 단위 초기화 주기
- 동점 정렬 규칙
- 지연 허용치 대비 배치 크기
- 메모리 상한 초과 시 폴백 정책
윈도우 반례와 복구 절차
Top-K 구현에서 흔한 오류입니다.
- 힙 비교 방향을 반대로 설정
k가 입력 길이보다 큰 경우 미처리- 동점 처리 규칙 미정의
- 레코드 비교에서 사용자 ID가 우선순위를 덮어씀
테스트는 k=0, k=1, k>N, 동점 다수 케이스를 반드시 포함해야 합니다.
Top-K 성능 경계 정리
균형 지점 판단 시에는 평균 성능뿐 아니라 최악 입력 분포, 메모리 제약, 구현 복잡도를 함께 비교해야 합니다.
| 방식 | 평균 시간 | 최악 시간 | 공간 |
|---|---|---|---|
| 힙 유지 Top-K | O(N log K) | O(N log K) | O(K) |
| 전체 정렬 후 절단 | O(N log N) | O(N log N) | O(N) |
K << N인 환경일수록 힙 방식 이점이 커집니다.
윈도우 갱신 문제분석 체크 포인트
- 입력이 무한 스트림이면 종료 시점과 출력 주기를 별도로 정의해야 합니다.
- 상태를 주기적으로 초기화할지 누적할지 정책을 우선 고정합니다.
- 성능 측정 시 파싱/직렬화 비용을 분리해야 순수 알고리즘 비용을 볼 수 있습니다.
- 멀티스레드 환경에서는 힙 접근 동기화 전략을 함께 설계해야 합니다.
Top-K 품질 체크포인트
- 최소 힙 루트를 경계값으로 사용하는 원리를 설명할 수 있는가
k=0,k>N, 동점 다수 케이스를 테스트했는가- 전역 Top-K와 윈도우 Top-K 요구를 구분했는가
- 정렬 출력이 필요한 시점만 추가 정렬하도록 설계했는가
Top-K 스트림 전략 정리
Top-K 문제의 본질은 경계값을 유지해 불필요한 데이터를 버리는 것입니다.
최소 힙 기반 설계를 익히면 스트림 처리 문제를 적은 메모리로 안정적으로 해결할 수 있습니다.