Thread pool 개념 및 C++ 설계/구현

Thread pool 개념 및 C++ 설계/구현

Thread pool

개요

Thread pool(이하 쓰레드풀)의 개념과 필요성을 알아보고 C++로 간단히 작성한 코드 리뷰

컴퓨터 프로그래밍에서 쓰레드란 프로그램의 실행점을 말하며 최대한 쉽게 설명하자면 Main() 함수에서 시작하는 흐름 외 다른 흐름을 만들어 병렬 처리를 할 수 있게 한다.

이전 포스팅에서 하나의 함수를 한 쓰레드로 올려 실행시켜 봤지만 그 과정이 너무 복잡하고 어떻게 써야할 지 몰라 그런가보다 하고 있다가 쓰레드 풀이라는 개념을 보고 다시한번 포스팅하게 됐다.

 

왜 필요한가?

쓰레드를 생성하고 회수하는데 많은 자원이 든다고 한다. 따라서 병렬처리해야 할 일들이 많다면 그만큼의 쓰레드를 생성하고 회수해야 한다.

여기서 몇몇 천재들은 이런 생각을 했을거다.

어차피 다른 곳에서 함수를 호출하는 건데 함수를 호출하는 쓰레드 몇개만 만들어서 계속 돌아가면서 호출하면 안될까?

threadpool
threadpool

이렇게 해서 만들어진게 “쓰레드 풀” 이면서, 단순히 작업할 것들을 Queue에 넣어놓고 차례대로 꺼내면서 처리하는 것이 전부이다.

 

필요한 것

두 개의 컨테이너가 필요하다.

  1. 각각의 함수를 호출할 쓰레드를 담아놓을 컨테이너
  2. 처리할 일들을 담아놓을 컨테이너

보통 예제를 보면 C++에 내장되어 있는 Vector 를 사용하는데, 본인은 본인이 구현한 Linked list를 사용하여 구현할 예정이다.

쓰레드에 관한 라이브러리는 POSIX Thread를 사용할 것이고, Mutex와 Conditional variable 개념이 필요하다. 두 개념은 간단히 얘기하면 아래와 같다.

  1. Mutex : Critical section에서의 통제
    • 쓰레드는 언제 어떻게 시작할 지 모르기 때문에 서로 다른 쓰레드에서 사용하는 “공유자원”이 있다면 적절하게 컨트롤 하는 기술이 필요하다.
  2. Conditional variable(이하 CV) : 쓰레드 사이의 통제
    • Mutex는 공유되는 자원을 컨트롤 했다면, CV는 쓰레드 사이의 통제를 하기위해 사용된다. 예를들어 쓰레드의 실행 순서가 있다.

 

설계 및 구현

설계
설계
  • linkedlist.h : 컨테이너
  • threadpool.h : 해당 기능 정의
  • threadpool.cpp : 해당 기능 구현
  • main.cpp : 해당 기능 테스트

#ifndef __THREAD__
#define __THREAD__

#include <unistd.h>
#include <pthread.h>
#include "logger.h"
#include "linkedlist.h"

class ThreadPool {
private:
    static bool isInit;
    static List<pthread_t *> threadList;
    static List<void (*)()> jobQueue;
    static pthread_attr_t threadAttribute;
    static pthread_mutex_t jobMutex;
    static pthread_cond_t jobConditionVariable;

    static void *worker(void *param);
public:
    ThreadPool();
    ThreadPool(size_t size);
    ~ThreadPool();

    bool enqueueJob(void (*job)());
    bool stop();
};


#endif /* __THREAD__ */

C++의 내장된 thread 기능을 사용하면 ThreadPool 클래스 안에 모든 것을 정의하여 구현할 수 있고, POSIX를 사용한다면 마음 편하게 전역 변수로 선언하여 사용하는 것이 좋다. 어찌됐던 POSIX를 사용하면 workerThread()가 전역변수여야 한다.
#include "thread.h"

// init static variables
bool ThreadPool::isInit;
List<pthread_t *> ThreadPool::threadList;
List<void (*)()> ThreadPool::jobQueue;
pthread_attr_t  ThreadPool::threadAttribute;
pthread_mutex_t ThreadPool::jobMutex;
pthread_cond_t ThreadPool::jobConditionVariable;

ThreadPool::ThreadPool()
{
    loglevel = TRACE;
    log(TRACE, "start", nullptr);
    isInit = true;

    pthread_attr_init(&threadAttribute);
    pthread_attr_setdetachstate(&threadAttribute, PTHREAD_CREATE_DETACHED);
    pthread_mutex_init(&jobMutex, nullptr);
    pthread_cond_init(&jobConditionVariable, nullptr);

    size_t i;
    pthread_t *tmp = nullptr;

    for(i=0 ; i<5 ; i++)
    {
        tmp = new pthread_t;
        threadList.addItem(tmp);

        pthread_create(tmp, &threadAttribute, worker, (void *)tmp);
    }
    log(TRACE, "create thread variables and execute worker thread", nullptr);

    log(TRACE, "end", nullptr);
    return;
}

ThreadPool::ThreadPool(size_t size)
{
    loglevel = TRACE;
    log(TRACE, "start", nullptr);
    isInit = true;

    pthread_attr_init(&threadAttribute);
    pthread_attr_setdetachstate(&threadAttribute, PTHREAD_CREATE_DETACHED);
    pthread_mutex_init(&jobMutex, nullptr);
    pthread_cond_init(&jobConditionVariable, nullptr);

    size_t i;
    pthread_t *tmp = nullptr;

    for(i=0 ; i<size ; i++)
    {
        tmp = new pthread_t;
        threadList.addItem(tmp);

        pthread_create(tmp, &threadAttribute, worker, (void *)tmp);
    }
    log(TRACE, "create thread variables and execute worker thread", nullptr);

    log(TRACE, "end", nullptr);
    return;
}

ThreadPool::~ThreadPool()
{
    log(TRACE, "start", nullptr);

    size_t i;
    for(i=0 ; i<threadList.getSize() ; i++)
    {
        pthread_cancel(*threadList.getItem(i));
        delete(threadList.getItem(i));
    }

    jobQueue.clear();
    threadList.clear();

    pthread_cond_destroy(&jobConditionVariable);
    pthread_mutex_destroy(&jobMutex);
    pthread_attr_destroy(&threadAttribute);

    log(TRACE, "end", nullptr);
    return;
}

bool ThreadPool::enqueueJob(void (*job)())
{
    bool retval = false;

    jobQueue.addItem(job);

    pthread_cond_signal(&jobConditionVariable);

    return retval;
}

bool ThreadPool::stop()
{
    log(TRACE, "start", nullptr);
    bool retval = false;
    
    pthread_mutex_lock(&jobMutex);
    if(isInit)
    {
        log(TRACE, "size %d", jobQueue.getSize());
        jobQueue.clear();
        isInit = false;
        log(TRACE, "execute stop command", nullptr);
    }
    else
    {
        log(ERROR, "ThreadPool not initialized", nullptr);
    }
    pthread_mutex_unlock(&jobMutex);

    log(TRACE, "end", nullptr);
    return retval;
}

void *ThreadPool::worker(void *param)
{
    void (*tmp)() = nullptr;

    while(1==1)
    {
        pthread_mutex_lock(&jobMutex);
        if(jobQueue.getSize() == 0)
        {
            tmp = nullptr;
            pthread_cond_wait(&jobConditionVariable, &jobMutex);
        }
        else
        {
            tmp = jobQueue.getItem(0);
            jobQueue.delIndex(0);
        }
        pthread_mutex_unlock(&jobMutex);

        if(tmp != nullptr)
            tmp();
    }
    return nullptr;
}

  1. ThreadPool() : 객체 생성 시 전역변수 초기화
    • jobQueue : 수행할 함수들을 담아놓는 컨테이너
    • threadList : workerThread 컨테이너
    • threadAttribute : 쓰레드 생성 옵션
    • jobConditionVariable : worker thread 사이의 제어를 위한 Conditional variable
    • jobMutex : 공통자원 “jobs”에 대한 Mutex
    • isInit: 쓰레드풀 상태
  2. ~ThreadPool() : 쓰레드풀 자원 반환
  3. enqueueJob() : 할 일을 컨테이너에 담고 큐에 일이 있다면 계속해서 worker thread를 호출하는 함수
  4. worker() : 컨테이너에 담긴 일을 처리하는 쓰레드

 

테스트

#include <iostream>
#include "thread.h"
#include "logger.h"

void foo()
{
    std::cout << "foo" << std::endl;
    sleep(3);
    return;
}
void bar(void)
{
    std::cout << "bar" << std::endl;
    sleep(3);
    return;
}

int main(int argc, char *argv[])
{
    loglevel = TRACE;

    log(TRACE, "%s", "start");

    ThreadPool p(1); // create 1 thread
    p.enqueueJob(foo);
    p.enqueueJob(foo);
    p.enqueueJob(foo);
    p.enqueueJob(foo);
    p.enqueueJob(foo);
    p.enqueueJob(foo);
    p.enqueueJob(bar);
    log(TRACE, "%s", "end enqueueJob()");

    // don't exit
    sleep(3600);

    return 0;
}

1개의 용량의 쓰레드풀을 생성하고 foo/bar함수를 호출하고 있다. 각 함수는 sleep(3); 을 호출하여 하나의 흐름으로 실행한다면 3 sec * 9 = 27초가 걸릴 것이다.

1 thread
1 thread

쓰레드가 한 개 생성 후 3초가 걸리는 일들을 하나씩 수행하는 모습이다.

 

이제 10개의 쓰레드를 만들어서 처리하면 어떤 모습일까?

10 thread
10 thread

9개의 foo/bar 함수를 동시에 실행시키는 모습을 볼 수 있다.

 

마치며

쓰레드 프로그래밍은 정말 어렵다는걸 다시 한번 느꼈다. 언제 어떻게 시작할지 모르는 것, Critical section, 쓰레드 조건, 반환 조건 등등 이번 계기를 통해 어느정도 개념이 잡히고 구현까지 해보게 되어 정말 좋은 공부가 됐다.

%d 블로거가 이것을 좋아합니다: