Databse connection pool (데이터베이스 컨넥션 풀) C/C++

Databse connection pool (데이터베이스 컨넥션 풀) C/C++

DB Pool

DB connection Pool(이하 컨넥션 풀)은 데이터베이스와의 통신을 위한 Connection 변수의 모음을 말한다.

데이터베이스와 단순 통신을 한다면 어떠한 요청이 있을 때 컨넥션 변수를 만들어 연결 및 처리 후 해제되는 과정을 거치는데, 프로그램이 종료될 때 까지 계속 데이터베이스를 사용한다면 연결/해제의 과정이 필요할까?

프로그램이 실행될 때 컨넥션을 미리 만들어 두었다가 요청이 있을 때 마다 처리만 해준다면 “생성/해제”라는 무지막지한 비용을 아낄 수 있을 것이다.

미리 만들어 두었다가 요청이 있을 때 마다 처리

사실 전에 만들었던 쓰레드 풀에 데이터베이스 컨넥션만 추가한 것이 되겠다.

환경

데이터베이스 풀을 직접 만들어보기 위해 아래와 같이 환경을 구성 하였습니다.

  • Windows 11 Pro (x64)
    • MSYS2
      • base-devel
      • Mingw-w64-x86_64-toolchain
      • Mingw-w64-x86_64-cmake
      • Mingw-w64-x86_64-spdlog
      • Mingw-w64-x86_64-postgresql
    • PostgreSQL 13.5
    • VS Code

설계

디렉토리 및 파일 구성

  • / (Root)
    • build
    • include
      • common.h
      • pool.h
    • lib
    • src
      • main.cpp
      • pool.cpp
    • CMakelists.txt

파일 설명

  1. common.h
    • src 폴더 하위 모든 소스에 공통적으로 사용할 객체 및 함수들의 선언이 되어있는 헤더
  2. pool.h / pool.cpp
    • 데이터베이스 풀에 관련 내용 선언 및 정의가 되어있는 파일들
  3. main.cpp
    • 데이터베이스 풀을 테스트할 메인 함수가 들어있는 파일
  4. CMakelists.txt
    • cmake config file

구현

/** 
 * @file common.h 
 * @author JongBin Park (jongbin@devbin.kr) 
 * @brief * @version 0.1 
 * @date 2022-03-29 
 * 
 * @copyright Copyright (c) 2022 
 * 
 */ 

#ifndef COMMON_H 
#define COMMON_H 

#include <spdlog/spdlog.h> 
#include <unistd.h> 

#endif /* COMMON_H */

  1. spdlog : C++에서 사용할 수 있는 아주 유용한 Log 라이브러리
  2. unistd.h : POSIX 운영체제 API
    * 본인은 windows.h 대신 사용했고, 사실상 sleep() 밖에 쓰지 않음

pool.h

/** 
 * @file pool.h 
 * @author JongBin Park (jongbin@devbin.kr) 
 * @brief 
 * @version 0.1 
 * @date 2022-03-29 
 * 
 * @copyright Copyright (c) 2022 
 * 
 */ 
#ifndef POOL_H 
#define POOL_H 

#define WILL_DELETE_OBJECT 
#define MAX_CONNECTION 10 
#define CONN_STR "dbname = 데이터베이스 명 \ 
                  user = 데이터베이스 유저명 \ 
                  password = 데이터베이스 패스워드 \ 
                  host = 데이터베이스 호스트 \
                  hostaddr = 호스트 IP \ 
                  port = 5432" 
                  
#include "common.h" 
#include <string> 
#include "libpq-fe.h" 
#include <thread>
#include <mutex> 
#include <condition_variable> 
#include <queue> 
#include <vector> 
#include <string> 

using namespace std; 

typedef struct tParam_s 
{ 
    unsigned int id; 
    PGconn *conn; 
    PGresult *pgResult; 
} tParam_t; 

typedef struct query_s 
{ 
    string query; 
    void *(*callBack)(void *); 
} query_t; 

typedef vector<vector<char *>> * ResultTable; 

class Pool 
{ 
private: 
    static thread **pool; 

    /* Critical section */ 
    static mutex mt; 
    static condition_variable cond; 
    static queue<query_t *> queue; 
    static void dequeueQuery(void *tParam); 
    /* Critical section */ 
    
public: 
    Pool(); 
    ~Pool(); 

    static void init(); 
    static void enqueueQuery(string query, void *(*callBack)(void *)); 
}; 

#endif /* POOL_H */

  1. #define WILL_DELETE_OBJECT
    • 동적으로 만든 객체들이 추 후 delete 가 되는지 확인하기 위한 더미 매크로
  2. #define MAX_CONNECTION 10
    • 데이터베이스 컨텍션 풀의 갯수를 결정하는 매크로, 한 개의 쓰레드당 한 개의 컨넥션 풀을 가진다.
  3. #define CONN_STR
    • PostgreSQL 에 접속하기 위한 정보가 들어간 매크로
  4. #include “libpq-fe.h”
    • PostgreSQL 데이터베이스 C/C++ 라이브러리
  5. #include <thread>
    • C++11 부터 적용된 내장된 쓰레드 호환성을 위해 pthread.h를 사용해도 상관 없음
  6. #include <mutex>
    • 생성된 여러 쓰레드들의 동시 접근 제어를 하기위한 뮤텍스 헤더
  7. #include <condition_variable>\
    • 특정 조건에 쓰레드를 제어하기 위한 상태 변수 헤더
  8. typedef struct sParam_s {} tParam_t
    • 쓰레드가 생성될 때 갖는 구조체
  9. typedef query_s {} query_t
    • 데이터베이스에 어떤 요청을 할 때 필요한 정보를 갖는 구제초
  10. typedef vector<vector<char *>> * ResultTable
    • 데이터베이스 Select 결과를 담는 변수
  11. class Pool
    • 데이터베이스 컨넥션 풀 클래스

pool.cpp

/** 
 * @file pool.cpp 
 * @author JongBin Park (jongbin@devbin.kr) 
 * @brief connection pool 
 * @version 0.1 
 * @date 2022-04-02 
 * 
 * @copyright Copyright (c) 2022 
 * 
 */ 
#include "pool.h" 

thread **Pool::pool; 
/* Critical section */ 
mutex Pool::mt; 
condition_variable Pool::cond; 
queue<query_t *> Pool::queue; 
/* Critical section */ 

void Pool::enqueueQuery(string query, void *(*callBack)(void *)) 
{ 
    spdlog::trace("start enqueueQuery()"); 
    query_t *tmp = new query_t; 
    tmp->query = query; 
    tmp->callBack = callBack; 

    mt.lock(); 
    queue.push(tmp); 
    cond.notify_one(); 
    mt.unlock(); 

    spdlog::trace("end enqueueQuery()"); 
    
    return; 
} 

void Pool::dequeueQuery(void *tParam) 
{ 
    tParam_t *t = (tParam_t *)tParam; 
    string query; 
    void *(*callBack)(void *) = nullptr; 
    spdlog::trace("start dequeueQuery() [ID : {:d}]", t->id); 
    
    while(true) 
    { 
        unique_lock<mutex> lock(mt); 
        if(queue.empty()) 
            cond.wait(lock); 
        else 
        { 
            query = queue.front()->query; 
            callBack = queue.front()->callBack; 

            delete queue.front(); 
            queue.pop(); 
            lock.unlock(); 
            t->pgResult = PQexec(t->conn, query.c_str()); 
            
            if(PQresultStatus(t->pgResult) == PGRES_EMPTY_QUERY) 
                spdlog::info("empty result [ID : {:d}]", t->id); 
            else if(PQresultStatus(t->pgResult) == PGRES_COMMAND_OK) 
                spdlog::info("success to execute but not result [ID : {:d}]", t->id); 
            else if(PQresultStatus(t->pgResult) == PGRES_TUPLES_OK) 
            { 
                spdlog::info("success to execute and have result [ID : {:d}]", t->id); 
                unsigned int i, j, row, col = 0; 
                row = PQntuples(t->pgResult); 
                col = PQnfields(t->pgResult); 
                if(row != 0) 
                { 
                    /* FIXME: will be deleted : NO */ 
                    vector<vector<char *>> *executedQueryResult = new vector(row, vector<char *>(col, nullptr)); 
                    for(i=0;i<row;i++) 
                    { 
                        for(j=0;j<col;j++) 
                        { 
                            (*executedQueryResult)[i][j] = PQgetvalue(t->pgResult, i, j); 
                            spdlog::trace("{:s}", PQgetvalue(t->pgResult, i, j)); 
                        } 
                    } 
                    
                    spdlog::info("sucess to fetch db result"); 
                    if(callBack != nullptr) 
                    { 
                        callBack((vector<vector<char *>> *)executedQueryResult); 
                        spdlog::info("sucess to execute callback function about query result"); 
                    } 
                    else 
                        spdlog::error("callback function pointer is nullptr"); 
                } 
                else 
                    spdlog::warn("result is empty"); 
            } 
            else if(PQresultStatus(t->pgResult) == PGRES_BAD_RESPONSE) 
            { 
                spdlog::error("server bad response [ID : {:d}]", t->id); 
                spdlog::error(PQresultErrorMessage(t->pgResult)); 
            } 
            else if(PQresultStatus(t->pgResult) == PGRES_NONFATAL_ERROR) 
            {
                spdlog::error("fail to execute but try again [ID : {:d}]", t->id); 
                spdlog::error(PQresultErrorMessage(t->pgResult)); 
            } 
            else if(PQresultStatus(t->pgResult) == PGRES_FATAL_ERROR) 
            { 
                spdlog::error("fatal error [ID : {:d}]", t->id); 
                spdlog::error(PQresultErrorMessage(t->pgResult)); 
            } 
            
            spdlog::trace(query.c_str()); PQclear(t->pgResult); 
        } 
    } 
    
    spdlog::trace("end dequeueQuery() [ID : {:d}]", t->id); 
    
    return; 
} 

Pool::Pool() 
{ 
    init(); 
    return; 
} 

Pool::~Pool() 
{ 
    spdlog::trace("start ~Pool()"); 
    unsigned int i = 0; 
    for(i=0;i<MAX_CONNECTION;i++) 
    { 
        delete *(pool + i); 
    } 

    delete pool; 
    spdlog::info("Success to destory DB pool"); 

    spdlog::trace("end ~Pool()"); 
    return; 
} 

void Pool::init() 
{ 
    spdlog::trace("start Pool()"); 
    unsigned int i = 0;
    tParam_t *tmpParam; 
    pool = new thread*[MAX_CONNECTION]; 
    
    /* will be deleted : OK */ 
    for(i=0;i<MAX_CONNECTION;i++) 
    { 
        tmpParam = new tParam_t; 
        /* will be deleted : OK */ 
        tmpParam->id = i; 
        tmpParam->conn = PQconnectdb(CONN_STR); 
        if(PQstatus(tmpParam->conn) != CONNECTION_OK) 
        { 
            spdlog::critical("fail to connect db server"); 
            spdlog::critical("check the server and configure"); 
            
            delete tmpParam; 
            continue; 
        } 
        else 
        { 
            *(pool + i) = new thread(dequeueQuery, tmpParam); 
            /* will be deleted : OK */ 
            (*(pool + i))->detach();
            usleep(100); 
            spdlog::info("Success to create DB pool [ID : {:d}]", i); 
        } 
    } 
    
    spdlog::trace("end Pool()"); 
    return; 
}

  1. init()
    • Pool 클래스 초기화
    • static 으로 선언 되어 있기 때문에 필요한 것들은 대부분 new를 통해 동적할당 해줌
  2. enqueueQuery()
    • 생성된 쓰레드가 바라보고 있는 큐에 요청을 넣는 함수
    • 쿼리와 콜백 함수가 들어있는 구조체를 동적으로 할당 후 처리가 완료되면 delete
  3. dequeueQuery()
    • 실제 생성되는 쓰레드 함수
    • 무한으로 루프를 돌며 큐에 적재된 요청을 꺼내 처리

main.cpp

#include "common.h" 
#include "pool.h" 

void *printResult(void *param) 
{ 
    while(1==1) 
    { 
        if(param != nullptr) 
        { 
            spdlog::critical("value : {:s} [address={:p}]", ((*(ResultTable)param))[0][0], (void*)param); 
            delete (ResultTable)param; 
            
            break; 
        } 
        else 
        { 
            sleep(1); 
        } 
    } 

    return nullptr; 
} 

int main() 
{ 
    spdlog::set_level(spdlog::level::info); 
    spdlog::info("start main()"); 
    
    unsigned int i = 0; 

    Pool::init(); 
    sleep(2); 
    
    for(i=0;i<100000000000;i++) 
    { 
        Pool::enqueueQuery("select * from test2 where 1=1", printResult); 
    } 
    
    sleep(3600); 
    spdlog::info("end main()"); 
    return 0; 
}

 

  1. printResult()
    • 콜백함수
    • 큐에 넣은 요청이 완료 되면 실행되는 함수이며 select 의 결과 값을 인자로 받아 사용
  2. main()
    • 프로그램이 시작될 때 실행되는 함수
    • 컨넥션 풀을 초기화하고 10개의 컨넥션 풀을 갖고 천 억번의 요청
%d 블로거가 이것을 좋아합니다: