지난번에 만들었던 스레드풀은 mutex를 이용하여 하나의 작업 큐(공유자원)을 lock-unlock 을 이용하여 각 스레드가 작업을 가져갔습니다.
스레드가 많고 작업 시간이 짧을 수록 더 많은 경쟁을 할 것인데 이로 인해 작업을 처리하는 비용보다 작업을 빼오는 오버헤드가 더 클 것 같습니다. 😨
💡 항상 Lock-free가 빠른 것은 아님!
따라서 벤치마킹 후 성능이 좋은 쪽을 선택해야 합니다.
기존 스레드풀에서 thread-unsafety 였던 큐를 Lock-free 큐로 변경을 하려고 합니다. 그러기 위해 큐를 직접 구현해야 하는데 큐의 조건은 다음과 같습니다.
- 여러 스레드가 push를 할 수 있음 (Multi-Procedure)
- bottom 을 가리키는 변수 또는 포인터를 다수의 스레드가 읽고 쓸 수 있음
- 풀에 작업을 넣는 스레드는 특정 한 스레드가 아님
- 여러 스레드가 pop을 할 수 있음 (Multi-Consumer)
- Top 을 가리키는 변수 또는 포인터를 다수의 스레드가 읽고 쓸 수 있음
- 풀에 생성된 스레드들이 작업을 가져감
따라서 구현해야 하는 큐는 MPMC Queue 입니다. 큐를 만들 때 Linked list 처럼 노드를 그 때 마다 생성해서 잇는 방법이 있을테고, 고정된 버퍼를 미리 만들어 놓는 방법도 있을겁니다.
저는 고정된 버퍼에 순환하는 방식(Ring buffer)으로 만들어 보도록 하겠습니다. 👍
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
| // queue.hpp
#ifndef __QUEUE_H__
#define __QUEUE_H__
#include<atomic>
#include<cstdint>
template<typename T, uint64_t CAPACITY = 1024>
class Queue {
constexpr static uint8_t CACHE_LINE = 64;
constexpr static uint64_t roundup_pow2(uint64_t n) {
n -= 1;
n |= n >> 1;
n |= n >> 2;
n |= n >> 4;
n |= n >> 8;
n |= n >> 16;
n |= n >> 32;
return n + 1;
}
struct Slot {
// ABA Problem
// CAS 연산에서 예상값 감지를 못하는 문제 (A→B→A)
// 배열마다 sequence 를 두어 항상 증가하도록 함 (A→B→C...)
std::atomic<uint64_t> sequence;
T data{};
};
public:
Queue()
: _top(0)
, _bottom(0) {
_buffer = std::make_unique<Slot[]>(_capacity);
for(uint64_t idx = 0; idx < _capacity; idx++) {
_buffer[idx].sequence.store(idx);
}
}
~Queue() {}
template<typename U = T>
bool push(U &&data) {
uint64_t bottom = _bottom.load(std::memory_order_relaxed);
for(;;) {
Slot *slot = &_buffer[bottom & (_capacity - 1)];
uint64_t seq = slot->sequence.load();
int64_t diff = (int64_t)seq - (int64_t)bottom;
if(diff == 0) {
// 슬롯 비어있음 넣어도 됨
if(_bottom.compare_exchange_weak(bottom, bottom + 1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
slot->data = std::forward<U>(data);
slot->sequence.store(seq + 1, std::memory_order_release);
return true;
}
} else if(diff > 0) {
// 비어있지 않음
bottom = _bottom.load(std::memory_order_relaxed);
continue;
} else {
// 큐가 꽉참
return false;
}
}
}
bool pop(T &data) {
uint64_t top = _top.load(std::memory_order_relaxed);
for(;;) {
Slot *slot = &_buffer[top & (_capacity - 1)];
uint64_t seq = slot->sequence.load(std::memory_order_acquire);
int64_t diff = (int64_t)seq - (int64_t)(top + 1);
if(diff == 0) {
// 데이터 있음
if(_top.compare_exchange_weak(top, top + 1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
data = slot->data;
slot->sequence.store(top + _capacity, std::memory_order_release);
return true;
}
} else {
// 데이터 없음
return false;
}
}
}
uint64_t size() {
return _bottom.load(std::memory_order_relaxed) - _top.load(std::memory_order_relaxed);
}
bool empty() {
return _bottom.load(std::memory_order_relaxed) - _top.load(std::memory_order_relaxed) == 0;
}
private:
const uint64_t _capacity = roundup_pow2(CAPACITY);
std::unique_ptr<Slot[]> _buffer;
alignas(CACHE_LINE) std::atomic<uint64_t> _top;
alignas(CACHE_LINE) std::atomic<uint64_t> _bottom;
};
#endif // __QUEUE_H__
|
💡 CAS(Compare-And-Swap) 연산
- 다수의 스레드가 Lock 없이 안전하게 데이터를 세팅할 수 있도록 하는 연산입니다.
- “예상하는 값이 들어 있다면 새로운 값으로 변경해라”
- 예상하는 값이 아니다 → 다른 스레드가 먼저 바꿨다 (실패 후 expect 에 최신값으로 갱신)
- 예상하는 값이다 → 내가 선점했고 변경할 수 있다 (성공)
- C++ 에서는
compare_exchange_weak
과 compare_exchange_strong
*제공 - CAS 연산에는 ABA 문제가 있음
- 예상값이 사실 예상값이 아니었던 것
- “예상하는 값 A” → “B 로 바뀜” → “A 로 다시 바뀜” → “CAS 연산 성공”
- 같은 주소의 집에 세입자가 바뀌었지만 우체부는 인지 못하고 우편물 배달
- 위 Ring buffer 에서
_top
, _bottom
그리고 Slot
에 있는 sequence
는 항상 증분함(ABA 원천 제거)
✅ memory_order 관련
_top
과 _bottom
은 push()/pop()
할 때 처음 로드하긴 하지만 중요하지 않음 → 어차피 큐가 찼는지, 사용할 수 있는지는 sequence
로 구분하며, 최종적으로 데이터를 삽입할 때는 CAS 연산 이용
✅ 정합성 관련
push thread 4개, pop thread 8개로 4억번 테스트 💯
더 정확한 테스트를 위해 TSAN 테스트 필요함
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| // ThreadPool.h
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__
#include<thread>
#include<mutex>
#include<condition_variable>
#include<functional>
#include<future>
#include<type_traits>
#include<vector>
#include<queue.hpp>
class ThreadPool {
constexpr static uint32_t SPIN_LOCK_NUM = 1000;
using Task = std::function<void()>;
public:
ThreadPool(size_t pool_size);
~ThreadPool();
template<typename Func, typename... Args>
std::future<std::invoke_result_t<Func, Args...>> add(Func &&func, Args&&... args);
private:
bool _destroy;
size_t _pool_size;
std::vector<std::thread> _threads;
std::mutex _mutex;
std::condition_variable _cv;
Queue<Task> _tasks;
int worker();
};
template<typename Func, typename... Args>
std::future<std::invoke_result_t<Func, Args...>> ThreadPool::add(Func &&func, Args&&... args) {
std::shared_ptr<std::packaged_task<std::invoke_result_t<Func, Args...>()>> task = \
std::make_shared<std::packaged_task<std::invoke_result_t<Func, Args...>()>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
while(!_tasks.push([task] { (*task)(); })) {
std::this_thread::yield();
}
_cv.notify_one();
return task->get_future();
}
#endif // __THREAD_POOL_H__
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| // ThreadPool.cc
#include<thread_pool.h>
ThreadPool::ThreadPool(size_t pool_size)
: _destroy(false)
, _pool_size(pool_size) {
for(size_t idx = 0; idx < _pool_size; idx++) {
_threads.emplace_back(&ThreadPool::worker, this);
}
}
ThreadPool::~ThreadPool() {
_mutex.lock();
_destroy = true;
_mutex.unlock();
_cv.notify_all();
for(std::thread &th : _threads) {
if(th.joinable()) {
th.join();
}
}
}
int ThreadPool::worker() {
Task task = nullptr;
uint32_t spin = 0;
for(;;) {
if(_tasks.pop(task)) {
if(task) {
task();
}
} else {
if(spin < SPIN_LOCK_NUM) {
std::this_thread::yield();
spin++;
} else {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait(lock, [this] { return _destroy || !_tasks.empty(); });
if(_destroy) break;
spin = 0;
}
}
}
return 0;
}
|
💡 Spin lock
스핀락(spinlock)은 임계 구역(critical section)에 진입이 불가능할 때 진입이 가능할 때까지 루프를 돌면서 재시도하는 방식으로 구현된 락을 가리킨다. (busy-waiting)
스핀락은 운영 체제의 스케줄링 지원을 받지 않기 때문에, 해당 스레드에 대한 문맥 교환이 일어나지 않는다. - Wikipeida
기존 스레드풀과 똑같고 단지 작업 큐만 바뀌었다.
🔬 Mutex vs Lock-free#
앞서 말했듯이 짧은 작업과 스레드가 많은 환경에서 mutex 가 불리할 것으로 예상했습니다.
그럼 정말 그런지 테스트를 진행해 보겠습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| #include<catch2/catch_all.hpp>
#include<spdlog/spdlog.h>
#include<thread_pool.h>
TEST_CASE("10,000,000 add/execute") {
const size_t COUNT = 10'000'000;
ThreadPool pool(12);
std::vector<std::future<void>> ret(COUNT);
std::atomic<size_t> count(0);
std::chrono::high_resolution_clock::time_point start;
std::chrono::high_resolution_clock::time_point end;
start = std::chrono::high_resolution_clock::now();
for(size_t i = 0; i < COUNT; i++) {
ret[i] = pool.add([&] { count++; });
}
for(std::future<void> &f : ret) {
f.wait();
}
end = std::chrono::high_resolution_clock::now();
std::chrono::milliseconds duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
REQUIRE(count.load() == COUNT);
spdlog::info("time: {}ms", duration.count());
}
|
12개의 스레드를 가진 풀에 단순히 카운트만 올리는 작업을 1천만번 넣고 돌립니다.
No. | mutex lock | lock free | diff | per |
---|
1 | 22.262 s | 4.026 s | 18.236 s | -81.91% |
2 | 22.798 s | 4.035 s | 18.763 s | -82.3% |
3 | 22.120 s | 4.043 s | 18.077 s | -81.72% |
4 | 22.591 s | 4.059 s | 18.532 | -82.03% |
5 | 22.303 s | 4.032 s | 18.271 | -81.92% |
💡 위에서도 말했듯이 항상 lock-free 가 빠른것은 아닙니다.
💡 mutex lock 은 경쟁이 생기면 스레드를 블록 시키고 컨텍스트 스위칭이 일어나 비용이 발생하는데 이 비용이 큼
CAS 는 CPU 원자적 명령어 한 두개로 끝나며 오버헤드는 작지만 실패했을 경우 계속 CPU 자원을 쓰므로(Loop) 효율적이진 않음
✅ 스레드풀은, 특히 Lock-free 와 같이 복잡하고 디버깅이 어려운 자료구조를 사용하는 것들은 검증된 라이브러리를 사용합시다.
디버깅이 안됩니다 😭
oneTBB, Folly, Boost 등
작업 시간이 있는 편이고 경쟁이 적은 환경이라면 mutex를 사용(효율 측면)하고, 그 반대의 경우라면 lock-free 를 사용(성능 측면)하는 것이 바람직 할 것 같습니다. 😄