지난번에 만들었던 스레드풀은 mutex를 이용하여 하나의 작업 큐(공유자원)을 lock-unlock 을 이용하여 각 스레드가 작업을 가져갔습니다.

스레드가 많고 작업 시간이 짧을 수록 더 많은 경쟁을 할 것인데 이로 인해 작업을 처리하는 비용보다 작업을 빼오는 오버헤드가 더 클 것 같습니다. 😨

💡 항상 Lock-free가 빠른 것은 아님!

따라서 벤치마킹 후 성능이 좋은 쪽을 선택해야 합니다.

🚀 Lock-free 작업 큐 구현

기존 스레드풀에서 thread-unsafety 였던 큐를 Lock-free 큐로 변경을 하려고 합니다. 그러기 위해 큐를 직접 구현해야 하는데 큐의 조건은 다음과 같습니다.

  1. 여러 스레드가 push를 할 수 있음 (Multi-Procedure)
    • bottom 을 가리키는 변수 또는 포인터를 다수의 스레드가 읽고 쓸 수 있음
    • 풀에 작업을 넣는 스레드는 특정 한 스레드가 아님
  2. 여러 스레드가 pop을 할 수 있음 (Multi-Consumer)
    • Top 을 가리키는 변수 또는 포인터를 다수의 스레드가 읽고 쓸 수 있음
    • 풀에 생성된 스레드들이 작업을 가져감

따라서 구현해야 하는 큐는 MPMC Queue 입니다. 큐를 만들 때 Linked list 처럼 노드를 그 때 마다 생성해서 잇는 방법이 있을테고, 고정된 버퍼를 미리 만들어 놓는 방법도 있을겁니다.

저는 고정된 버퍼에 순환하는 방식(Ring buffer)으로 만들어 보도록 하겠습니다. 👍

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
// queue.hpp
#ifndef __QUEUE_H__
#define __QUEUE_H__

#include<atomic>
#include<cstdint>

template<typename T, uint64_t CAPACITY = 1024>
class Queue {
    constexpr static uint8_t CACHE_LINE = 64;
    constexpr static uint64_t roundup_pow2(uint64_t n) {
        n -= 1;
        n |= n >> 1;
        n |= n >> 2;
        n |= n >> 4;
        n |= n >> 8;
        n |= n >> 16;
        n |= n >> 32;
        return n + 1;
    }
    struct Slot {
        // ABA Problem
        // CAS 연산에서 예상값 감지를 못하는 문제 (A→B→A)
        // 배열마다 sequence 를 두어 항상 증가하도록 함 (A→B→C...)
        std::atomic<uint64_t> sequence;
        T data{};
    };
public:
    Queue()
        : _top(0)
        , _bottom(0) {
        _buffer = std::make_unique<Slot[]>(_capacity);
        for(uint64_t idx = 0; idx < _capacity; idx++) {
            _buffer[idx].sequence.store(idx);
        }
    }
    ~Queue() {}

    template<typename U = T>
    bool push(U &&data) {
        uint64_t bottom = _bottom.load(std::memory_order_relaxed);
        for(;;) {
            Slot *slot = &_buffer[bottom & (_capacity - 1)];
            uint64_t seq = slot->sequence.load();
            int64_t diff = (int64_t)seq - (int64_t)bottom;
            if(diff == 0) {
                // 슬롯 비어있음 넣어도 됨
                if(_bottom.compare_exchange_weak(bottom, bottom + 1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                    slot->data = std::forward<U>(data);
                    slot->sequence.store(seq + 1, std::memory_order_release);
                    return true;
                }
            } else if(diff > 0) {
                // 비어있지 않음
                bottom = _bottom.load(std::memory_order_relaxed);
                continue;
            } else {
                // 큐가 꽉참
                return false;
            }
        }
    }

    bool pop(T &data) {
        uint64_t top = _top.load(std::memory_order_relaxed);

        for(;;) {
            Slot *slot = &_buffer[top & (_capacity - 1)];
            uint64_t seq = slot->sequence.load(std::memory_order_acquire);
            int64_t diff = (int64_t)seq - (int64_t)(top + 1);

            if(diff == 0) {
                // 데이터 있음
                if(_top.compare_exchange_weak(top, top + 1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                    data = slot->data;
                    slot->sequence.store(top + _capacity, std::memory_order_release);
                    return true;
                }
            } else {
                // 데이터 없음
                return false;
            }
        }
    }

    uint64_t size() {
        return _bottom.load(std::memory_order_relaxed) - _top.load(std::memory_order_relaxed);
    }

    bool empty() {
        return _bottom.load(std::memory_order_relaxed) - _top.load(std::memory_order_relaxed) == 0;
    }

private:
    const uint64_t _capacity = roundup_pow2(CAPACITY);
    std::unique_ptr<Slot[]> _buffer;
    alignas(CACHE_LINE) std::atomic<uint64_t> _top;
    alignas(CACHE_LINE) std::atomic<uint64_t> _bottom;
};

#endif // __QUEUE_H__

💡 CAS(Compare-And-Swap) 연산

  • 다수의 스레드가 Lock 없이 안전하게 데이터를 세팅할 수 있도록 하는 연산입니다.
  • “예상하는 값이 들어 있다면 새로운 값으로 변경해라”
    • 예상하는 값이 아니다 → 다른 스레드가 먼저 바꿨다 (실패 후 expect 에 최신값으로 갱신)
    • 예상하는 값이다 → 내가 선점했고 변경할 수 있다 (성공)
  • C++ 에서는 compare_exchange_weakcompare_exchange_strong*제공
  • CAS 연산에는 ABA 문제가 있음
    • 예상값이 사실 예상값이 아니었던 것
    • “예상하는 값 A” → “B 로 바뀜” → “A 로 다시 바뀜” → “CAS 연산 성공”
    • 같은 주소의 집에 세입자가 바뀌었지만 우체부는 인지 못하고 우편물 배달
    • Ring buffer 에서 _top, _bottom그리고 Slot 에 있는 sequence 는 항상 증분함(ABA 원천 제거)

memory_order 관련

_top_bottompush()/pop() 할 때 처음 로드하긴 하지만 중요하지 않음 → 어차피 큐가 찼는지, 사용할 수 있는지는 sequence 로 구분하며, 최종적으로 데이터를 삽입할 때는 CAS 연산 이용

✅ 정합성 관련

push thread 4개, pop thread 8개로 4억번 테스트 💯

더 정확한 테스트를 위해 TSAN 테스트 필요함

Thread Pool 구현

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// ThreadPool.h
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__

#include<thread>
#include<mutex>
#include<condition_variable>
#include<functional>
#include<future>
#include<type_traits>
#include<vector>
#include<queue.hpp>

class ThreadPool {
    constexpr static uint32_t SPIN_LOCK_NUM = 1000;
    using Task = std::function<void()>;
public:
    ThreadPool(size_t pool_size);
    ~ThreadPool();

    template<typename Func, typename... Args>
    std::future<std::invoke_result_t<Func, Args...>> add(Func &&func, Args&&... args);

private:
    bool _destroy;
    size_t _pool_size;
    std::vector<std::thread> _threads;
    std::mutex _mutex;
    std::condition_variable _cv;
    Queue<Task> _tasks;

    int worker();
};

template<typename Func, typename... Args>
std::future<std::invoke_result_t<Func, Args...>> ThreadPool::add(Func &&func, Args&&... args) {
    std::shared_ptr<std::packaged_task<std::invoke_result_t<Func, Args...>()>> task = \
        std::make_shared<std::packaged_task<std::invoke_result_t<Func, Args...>()>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
    while(!_tasks.push([task] { (*task)(); })) {
        std::this_thread::yield();
    }
    _cv.notify_one();

    return task->get_future();
}

#endif // __THREAD_POOL_H__
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// ThreadPool.cc
#include<thread_pool.h>

ThreadPool::ThreadPool(size_t pool_size)
    : _destroy(false)
    , _pool_size(pool_size) {
    for(size_t idx = 0; idx < _pool_size; idx++) {
        _threads.emplace_back(&ThreadPool::worker, this);
    }
}

ThreadPool::~ThreadPool() {
    _mutex.lock();
    _destroy = true;
    _mutex.unlock();
    _cv.notify_all();

    for(std::thread &th : _threads) {
        if(th.joinable()) {
            th.join();
        }
    }
}

int ThreadPool::worker() {
    Task task = nullptr;
    uint32_t spin = 0;

    for(;;) {
        if(_tasks.pop(task)) {
            if(task) {
                task();
            }
        } else {
            if(spin < SPIN_LOCK_NUM) {
                std::this_thread::yield();
                spin++;
            } else {
                std::unique_lock<std::mutex> lock(_mutex);
                _cv.wait(lock, [this] { return _destroy || !_tasks.empty(); });
                if(_destroy) break;
                spin = 0;
            }
        }
    }

    return 0;
}

💡 Spin lock 스핀락(spinlock)은 임계 구역(critical section)에 진입이 불가능할 때 진입이 가능할 때까지 루프를 돌면서 재시도하는 방식으로 구현된 락을 가리킨다. (busy-waiting)

스핀락은 운영 체제의 스케줄링 지원을 받지 않기 때문에, 해당 스레드에 대한 문맥 교환이 일어나지 않는다. - Wikipeida

기존 스레드풀과 똑같고 단지 작업 큐만 바뀌었다.

🔬 Mutex vs Lock-free

앞서 말했듯이 짧은 작업과 스레드가 많은 환경에서 mutex 가 불리할 것으로 예상했습니다.

그럼 정말 그런지 테스트를 진행해 보겠습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include<catch2/catch_all.hpp>
#include<spdlog/spdlog.h>
#include<thread_pool.h>

TEST_CASE("10,000,000 add/execute") {
    const size_t COUNT = 10'000'000;
    ThreadPool pool(12);
    std::vector<std::future<void>> ret(COUNT);
    std::atomic<size_t> count(0);
    std::chrono::high_resolution_clock::time_point start;
    std::chrono::high_resolution_clock::time_point end;

    start = std::chrono::high_resolution_clock::now();
    for(size_t i = 0; i < COUNT; i++) {
        ret[i] = pool.add([&] { count++; });
    }
    for(std::future<void> &f : ret) {
        f.wait();
    }
    end = std::chrono::high_resolution_clock::now();

    std::chrono::milliseconds duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    REQUIRE(count.load() == COUNT);
    spdlog::info("time: {}ms", duration.count());
}

12개의 스레드를 가진 풀에 단순히 카운트만 올리는 작업을 1천만번 넣고 돌립니다.

No.mutex locklock freediffper
122.262 s4.026 s18.236 s-81.91%
222.798 s4.035 s18.763 s-82.3%
322.120 s4.043 s18.077 s-81.72%
422.591 s4.059 s18.532-82.03%
522.303 s4.032 s18.271-81.92%

💡 위에서도 말했듯이 항상 lock-free 가 빠른것은 아닙니다.

💡 mutex lock 은 경쟁이 생기면 스레드를 블록 시키고 컨텍스트 스위칭이 일어나 비용이 발생하는데 이 비용이 큼

CASCPU 원자적 명령어 한 두개로 끝나며 오버헤드는 작지만 실패했을 경우 계속 CPU 자원을 쓰므로(Loop) 효율적이진 않음

결론

✅ 스레드풀은, 특히 Lock-free 와 같이 복잡하고 디버깅이 어려운 자료구조를 사용하는 것들은 검증된 라이브러리를 사용합시다. 디버깅이 안됩니다 😭

oneTBB, Folly, Boost 등

작업 시간이 있는 편이고 경쟁이 적은 환경이라면 mutex를 사용(효율 측면)하고, 그 반대의 경우라면 lock-free 를 사용(성능 측면)하는 것이 바람직 할 것 같습니다. 😄