Thread Pool 사용 이유 및 C++ 구현

Thread ❓

💡 스레드는 어떠한 프로그램 내에서, 특히 프로세스 내에서 실행되는 흐름의 단위를 말한다. 일반적으로 한 프로그램은 하나의 스레드를 가지고 있지만, 프로그램 환경에 따라 둘 이상의 스레드를 동시에 실행할 수 있다. 이러한 실행 방식을 멀티 스레드라고 한다. wikipeida

멀티 스레드를 사용하지 않는 프로그램은 찾아보기 어렵습니다. 스레드가 하나인 환경에서 작업이 지연된다면 프로그램 전체가 멈추는 불상사가 생기기 때문이죠. 잘 만들어진 프레임워크를 사용하기 때문에 신경 쓸 필요가 없기 때문이죠!

소스 코드를 여러 단위로, 즉 병렬로 처리할 수 있게 해주는 스레드를 C++에서 어떻게 사용하는지 알아보도록 하겠습니다.

Modern C++

스레드는 소켓과 같이 운영체제에 종속되어 있습니다. 리눅스와 유닉스 계열에서 POSIX Thread를 사용하고, 윈도우 계열에서는 Win32 API 또는 MFC를 이용하여 스레드를 사용합니다. 물론 플랫폼에 구애 받지 않는 프로그래밍 언어라면 모두 똑같을겁니다!

하지만 Modern C++1)의 등장으로 이제 C++도 운영체제에 종속되지 않고 사용할 수 있습니다.

Thread VS Thread Pool 🥊

MethodExplanaton
Thread필요할 때마다 생성해서 사용하며 구현이 비교적 쉽고 빠름
Thread Pool스레드를 사용할 만큼 미리 할당하고 어떠한 작업이 들어왔을 때 실행하며 구현이 다소 어려움

스레드 풀 사용 이유

thread pool

스레드가 많으면 Context switch2) 비용이 많이 듭니다. 조금이나마 성능에 도움이 되려고 사용했던 스레드가 반대의 결과를 초래하면 안 되겠죠? 스레드가 어떤 기준으로 많고 적음을 판단할 수는 없지만 스레드의 무분별한 생성은 막아야 합니다. 예를 들면, 소켓 프로그래밍에서 클라이언트가 접속할 때마다 스레드를 생성한다던가 반복문 안에서의 스레드 생성 등등…

적당히 병렬로 처리할 수 있으면서, 무분별한 생성을 막는 메커니즘이 바로 스레드 풀입니다. 위의 비교에서도 말했다시피 미리 적당한 스레드를 생성해 놓습니다. 그리고 재워놓죠 🛌

처리해야 할 작업들은 큐에 넣고 들어왔을 때 하나씩 깨워서 처리하면 됩니다! 물론 처리 후에 다시 재워야겠죠?

구현

💡 Environment

  • Windows 10 64bit
    • MSYS2
      • make v4.3
      • g++ v12.2.0

아래 구현될 코드는 모두의 코드의 내용을 참고하였습니다. 정말 감사드립니다!

프로젝트 생성 🏗️

깃 허브에 올릴 계획이라 thread-lib-cpp라는 저장소 이름으로 생성했고 초기화를 위해 몇 가지 설정 파일과 함께 첫 번째 커밋을 했습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
thread-lib-cpp
│   .gitattributes
│   .gitignore
│   LICENSE
│   Makefile
│   README.md
├───inc
│       thread.h
└───src
        main.cpp
        thread.cpp

코딩

⚠️ 잘못된 점이 있다면 메일로 피드백 부탁드립니다!

thread.h

우선 헤더 파일에 어떤 것들이 필요한지 살펴봅시다.

  1. C++11의 스레드를 사용하기 위해 추가한 헤더 파일
1
2
3
#include <thread>
#include <mutex>
#include <condition_variable>
  1. 스레드 및 작업을 관리하기 위한 자료구조 헤더 파일
1
2
#include <queue>
#include <vector>
  1. 스레드로 돌린 함수의 결과를 비동기적으로 반환받기 위해 사용된 헤더 파일
1
2
#include <future>
#include <functional>
  1. Thread라는 클래스 생성
    • private 멤버
      1. 객체 소멸/스레드 시작 플래그
      2. 스레드 풀 사이즈
      3. 스레드 관리를 위한 1차원 배열
      4. 작업 관리를 위한 큐
      5. 재우고 깨우기 위한 condition_variable
      6. 공유 자원 접근 관리를 위한 mutex
      7. 실제 작업을 돌릴 worker
    • public 멤버
      1. 생성자/소멸자
      2. 스레드 시작 및 중지 관리 함수
      3. 현재 밀린 작업의 수를 반환하는 함수
      4. 작업을 넣는 함수
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Thread
{
private:
    bool isDestroy;
    bool isStop;

    size_t poolSize;

    std::vector<std::thread> threadList;
    std::queue<std::function<void()>> jobList;

    std::condition_variable cond;
    std::mutex mutex;
    bool canThisContinue();
    void workerThread();
public:
    Thread(int poolSize = 5);
    ~Thread();

    template<typename F, typename... Args>
    std::future<typename std::result_of<F(Args...)>::type> add(F &&func, Args&&... args);

    void start() { isStop = false; cond.notify_all(); }
    void stop() { isStop = true; }

    size_t getJobCount() { return jobList.size(); }
    static void sleep(size_t milliseconds)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds));
    }
};

따라서 헤더의 전체 코드는 아래와 같습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#ifndef __THREAD_H__
#define __THREAD_H__

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <future>
#include <functional>
#include <chrono>

class Thread
{
private:
    bool isDestroy;
    bool isStop;

    size_t poolSize;

    std::vector<std::thread> threadList;
    std::queue<std::function<void()>> jobList;

    std::condition_variable cond;
    std::mutex mutex;
    bool canThisContinue();
    void workerThread();
public:
    Thread(int poolSize = 5);
    ~Thread();

    template<typename F, typename... Args>
    std::future<typename std::result_of<F(Args...)>::type> add(F &&func, Args&&... args);

    void start() { isStop = false; cond.notify_all(); }
    void stop() { isStop = true; }

    size_t getJobCount() { return jobList.size(); }
    static void sleep(size_t milliseconds)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds));
    }
};

template<typename F, typename... Args>
std::future<typename std::result_of<F(Args...)>::type> Thread::add(F&& func, Args&&... args)
{
    using returnDataType = typename std::result_of<F(Args...)>::type;
    std::future<returnDataType> result;

    std::shared_ptr<std::packaged_task<returnDataType()>> job = \
        std::make_shared<std::packaged_task<returnDataType()>>(std::bind(std::forward<F>(func), std::forward<Args>(args)...));
    
    result = job->get_future();

    mutex.lock();
    jobList.push([job]() { (*job)(); });
    mutex.unlock();

    cond.notify_one();

    return result;
}

#endif /* __THREAD_H__ */

thread.cpp

이제 소스 파일에 어떻게 정의가 되어있는지 살펴보겠습니다.

  1. 생성자
    • 매개 변수로 풀 사이즈를 받고 있습니다.
    • 플래그 값을 초기화하고 풀 사이즈만큼 workerThread 함수로 스레드를 생성하고 있습니다.
    • 💡 Tips 여기서 람다 함수3)를 사용한 이유는 클래스의 멤버 함수로 스레드를 생성하지 못하기 때문입니다. 스레드로 사용하려면 static으로 정의되어야 합니다.

1
2
3
4
5
6
7
8
9
Thread::Thread(int poolSize)
{
    this->poolSize = poolSize;
    isDestroy = false;
    isStop = false;

    for(int i=0; i<poolSize; i++)
        threadList.emplace_back([this]() { workerThread(); });
}
  1. 소멸자
    • 생성자와 반대로 프로세스가 진행됩니다.
    • ⚠️ 메인 스레드가 종료되기 전에 생성된 모든 스레드를 중지시키고 정리될 때까지 기다려야 합니다. join()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Thread::~Thread()
{
    isStop = true;
    isDestroy = true;
    
    cond.notify_all();
    for(size_t i=0; i<poolSize; i++)
    {
        threadList.at(i).join();
    }
}
  1. workerThread
    • 일단 잠들어 있다가 큐에 작업이 들어오면 실행합니다.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
void Thread::workerThread()
{
    std::function<void()> temp;

    while(true)
    {
        std::unique_lock<std::mutex> lock(mutex);
        cond.wait(lock, [this]() -> bool { return canThisContinue(); });
        if(isDestroy && isStop && jobList.empty())
            break;
        else if(!isDestroy && isStop)
            continue;

        temp = std::move(jobList.front());
        jobList.pop(); 
        lock.unlock();
        temp();
    }
    return;
}
  1. add
    • 작업을 추가합니다.
    • 템플릿 함수이기 때문에 헤더 파일에 있습니다.
    • 💡 std::result_of 템플릿 함수는 해당 함수의 반환 데이터형을 알 수 있습니다.
      또한 이 함수는 C++14에서 제거되고 std::invoke_result()로 대체 가능합니다.

    • using을 사용하여 긴 데이터 타입을 짧게 재정의 했습니다.
    • std::future 와 std::packaged_task를 사용하여 비동기 함수 반환 로직을 구현합니다.
    • std::mutex를 이용하여 공유 자원을 컨트롤 합니다.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
template<typename F, typename... Args>
std::future<typename std::result_of<F(Args...)>::type> Thread::add(F&& func, Args&&... args)
{
    using returnDataType = typename std::result_of<F(Args...)>::type;
    std::future<returnDataType> result;

    std::shared_ptr<std::packaged_task<returnDataType()>> job = \
        std::make_shared<std::packaged_task<returnDataType()>>(std::bind(std::forward<F>(func), std::forward<Args>(args)...));
    
    result = job->get_future();

    mutex.lock();
    jobList.push([job]() { (*job)(); });
    mutex.unlock();

    cond.notify_one();

    return result;
}

전체 코드는 아래와 같습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include "thread.h"

Thread::Thread(int poolSize)
{
    this->poolSize = poolSize;
    isDestroy = false;
    isStop = false;

    for(int i=0; i<poolSize; i++)
        threadList.emplace_back([this]() { workerThread(); });
}

Thread::~Thread()
{
    isStop = true;
    isDestroy = true;
    
    cond.notify_all();
    for(size_t i=0; i<poolSize; i++)
    {
        threadList.at(i).join();
    }
}

bool Thread::canThisContinue() 
{
    bool result = true;

    if(!isDestroy && !isStop && jobList.empty()) result = false;
    else if(!isDestroy && isStop) result = false;

    return result;
}

void Thread::workerThread()
{
    std::function<void()> temp;

    while(true)
    {
        std::unique_lock<std::mutex> lock(mutex); //canThisContinue();
        cond.wait(lock, [this]() -> bool { return canThisContinue(); });
        if(isDestroy && isStop && jobList.empty())
            break;
        else if(!isDestroy && isStop)
            continue;

        temp = std::move(jobList.front());
        jobList.pop(); 
        lock.unlock();
        temp();
    }
    return;
}

테스트 🧪

thread-lib-cpp-github

index 0~999 까지 add() 호출 후 해당 함수에서 인덱스를 화면에 출력합니다.

  1. git clone https://github.com/JongBin-Park/thread-lib-cpp.git
  2. cd thread-lib-cpp.git
  3. make
  4. ./test.out
  5. make clean

test image

💡 스레드는 순서 없이, 무작위로 시작되기 때문에 주의 깊은 코딩이 필요합니다!

각주

  1. C++11 이후 버전을 의미하며 많은 문법 및 기능이 추가되었습니다. 더 많은 것을 알아보려면 이곳을 눌러주세요.
  2. 문맥 교환(文脈交換, context switch)이란 하나의 프로세스가 CPU를 사용 중인 상태에서 다른 프로세스가 CPU를 사용하도록 하기 위해, 이전의 프로세스의 상태(문맥)를 보관하고 새로운 프로세스의 상태를 적재하는 작업을 말한다. wikipedia
  3. 이름이 없는 함수, 익명 함수라고도 하며 따로 선언/정의할 필요 없이 그 자리에서 곧바로 작성할 수 있다.