Databse connection pool (데이터베이스 컨넥션 풀) C/C++

DB Pool

DB connection Pool(이하 컨넥션 풀)은 데이터베이스와의 통신을 위한 Connection 변수의 모음을 말한다.

collection
collection

데이터베이스와 단순 통신을 한다면 어떠한 요청이 있을 때 컨넥션 변수를 만들어 연결 및 처리 후 해제되는 과정을 거치는데, 프로그램이 종료될 때 까지 계속 데이터베이스를 사용한다면 연결/해제의 과정이 필요할까?

프로그램이 실행될 때 컨넥션을 미리 만들어 두었다가 요청이 있을 때 마다 처리만 해준다면 “생성/해제”라는 무지막지한 비용을 아낄 수 있을 것이다.

미리 만들어 두었다가 요청이 있을 때 마다 처리

사실 전에 만들었던 쓰레드 풀에 데이터베이스 컨넥션만 추가한 것이 되겠다.

환경

데이터베이스 풀을 직접 만들어보기 위해 아래와 같이 환경을 구성 하였습니다.

  • Windows 11 Pro (x64)
    • MSYS2
      • base-devel
      • Mingw-w64-x86_64-toolchain
      • Mingw-w64-x86_64-cmake
      • Mingw-w64-x86_64-spdlog
      • Mingw-w64-x86_64-postgresql
    • PostgreSQL 13.5
    • VS Code

설계

디렉토리 및 파일 구성

  • / (Root)
    • build
    • include
      • common.h
      • pool.h
    • lib
    • src
      • main.cpp
      • pool.cpp
    • CMakelists.txt

파일 설명

  1. common.h
    • src 폴더 하위 모든 소스에 공통적으로 사용할 객체 및 함수들의 선언이 되어있는 헤더
  2. pool.h / pool.cpp
    • 데이터베이스 풀에 관련 내용 선언 및 정의가 되어있는 파일들
  3. main.cpp
    • 데이터베이스 풀을 테스트할 메인 함수가 들어있는 파일
  4. CMakelists.txt
    • cmake config file

구현

common.h

/**
 * @file common.h
 * @author JongBin Park (jongbin@devbin.kr)
 * @brief 
 * @version 0.1
 * @date 2022-03-29
 * 
 * @copyright Copyright (c) 2022
 * 
 */

#ifndef COMMON_H
#define COMMON_H

#include <spdlog/spdlog.h>
#include <unistd.h>

#endif /* COMMON_H */
  1. spdlog : C++에서 사용할 수 있는 아주 유용한 Log 라이브러리
  2. unistd.h : POSIX 운영체제 API
    * 본인은 windows.h 대신 사용했고, 사실상 sleep() 밖에 쓰지 않음

pool.h

/**
 * @file pool.h
 * @author JongBin Park (jongbin@devbin.kr)
 * @brief 
 * @version 0.1
 * @date 2022-03-29
 * 
 * @copyright Copyright (c) 2022
 * 
 */

#ifndef POOL_H
#define POOL_H

#define WILL_DELETE_OBJECT
#define MAX_CONNECTION 10
#define CONN_STR "dbname   = 데이터베이스 명 \
                  user     = 데이터베이스 유저명 \
                  password = 데이터베이스 패스워드 \
                  host     = 데이터베이스 호스트 \
                  hostaddr = 호스트 IP \
                  port     = 5432"

#include "common.h"
#include <string>

#include "libpq-fe.h"
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <string>

using namespace std;
typedef struct tParam_s
{
    unsigned int id;

    PGconn *conn;
    PGresult *pgResult;
} tParam_t;

typedef struct query_s
{
    string query;
    void *(*callBack)(void *);
} query_t;
typedef vector<vector<char *>> * ResultTable;

class Pool {
private:
    static thread **pool;
    
    /* Critical section */
    static mutex mt;
    static condition_variable cond;
    static queue<query_t *> queue;

    static void dequeueQuery(void *tParam);
    /* Critical section */

public:
    Pool();
    ~Pool();

    static void init();

    static void enqueueQuery(string query, void *(*callBack)(void *));
};

#endif /* POOL_H */
  1. #define WILL_DELETE_OBJECT
    • 동적으로 만든 객체들이 추 후 delete 가 되는지 확인하기 위한 더미 매크로
  2. #define MAX_CONNECTION 10
    • 데이터베이스 컨텍션 풀의 갯수를 결정하는 매크로, 한 개의 쓰레드당 한 개의 컨넥션 풀을 가진다.
  3. #define CONN_STR
    • PostgreSQL 에 접속하기 위한 정보가 들어간 매크로
  4. #include “libpq-fe.h”
    • PostgreSQL 데이터베이스 C/C++ 라이브러리
  5. #include <thread>
    • C++11 부터 적용된 내장된 쓰레드 호환성을 위해 pthread.h를 사용해도 상관 없음
  6. #include <mutex>
    • 생성된 여러 쓰레드들의 동시 접근 제어를 하기위한 뮤텍스 헤더
  7. #include <condition_variable>\
    • 특정 조건에 쓰레드를 제어하기 위한 상태 변수 헤더
  8. typedef struct sParam_s {} tParam_t
    • 쓰레드가 생성될 때 갖는 구조체
  9. typedef query_s {} query_t
    • 데이터베이스에 어떤 요청을 할 때 필요한 정보를 갖는 구제초
  10. typedef vector<vector<char *>> * ResultTable
    • 데이터베이스 Select 결과를 담는 변수
  11. class Pool
    • 데이터베이스 컨넥션 풀 클래스

pool.cpp

/**
 * @file pool.cpp
 * @author JongBin Park (jongbin@devbin.kr)
 * @brief connection pool
 * @version 0.1
 * @date 2022-04-02
 * 
 * @copyright Copyright (c) 2022
 * 
 */
#include "pool.h"

thread **Pool::pool;

/* Critical section */
mutex Pool::mt;
condition_variable Pool::cond;
queue<query_t *> Pool::queue;
/* Critical section */

void Pool::enqueueQuery(string query, void *(*callBack)(void *))
{
    spdlog::trace("start enqueueQuery()");

    query_t *tmp = new query_t;
    tmp->query = query;
    tmp->callBack = callBack;

    mt.lock();
    queue.push(tmp);
    
    cond.notify_one();

    mt.unlock();
    spdlog::trace("end enqueueQuery()");
    return;
}

void Pool::dequeueQuery(void *tParam)
{
    tParam_t *t = (tParam_t *)tParam;
    string query;
    void *(*callBack)(void *) = nullptr;

    spdlog::trace("start dequeueQuery() [ID : {:d}]", t->id);    
    while(true)
    {
        unique_lock<mutex> lock(mt);
        if(queue.empty())
            cond.wait(lock);
        else
        {
            query = queue.front()->query;
            callBack = queue.front()->callBack;

            delete queue.front();
            queue.pop();
            lock.unlock();

            t->pgResult = PQexec(t->conn, query.c_str());

            if(PQresultStatus(t->pgResult) == PGRES_EMPTY_QUERY)
                spdlog::info("empty result [ID : {:d}]", t->id);
            else if(PQresultStatus(t->pgResult) == PGRES_COMMAND_OK)
                spdlog::info("success to execute but not result [ID : {:d}]", t->id);
            else if(PQresultStatus(t->pgResult) == PGRES_TUPLES_OK)
            {
                spdlog::info("success to execute and have result [ID : {:d}]", t->id);
                
                unsigned int i, j, row, col = 0;

                row = PQntuples(t->pgResult);
                col = PQnfields(t->pgResult);

                if(row != 0)
                {
                    /* FIXME: will be deleted : NO */
                    vector<vector<char *>> *executedQueryResult = new vector(row, vector<char *>(col, nullptr));

                    for(i=0;i<row;i++)
                    {
                        for(j=0;j<col;j++)
                        {
                            (*executedQueryResult)[i][j] = PQgetvalue(t->pgResult, i, j);
                            spdlog::trace("{:s}", PQgetvalue(t->pgResult, i, j));
                        }
                    }
                    spdlog::info("sucess to fetch db result");

                    if(callBack != nullptr)
                    {
                        callBack((vector<vector<char *>> *)executedQueryResult);
                        spdlog::info("sucess to execute callback function about query result");
                    }
                    else
                        spdlog::error("callback function pointer is nullptr");
                }
                else
                    spdlog::warn("result is empty");

            }
            else if(PQresultStatus(t->pgResult) == PGRES_BAD_RESPONSE)
            {
                spdlog::error("server bad response [ID : {:d}]", t->id);
                spdlog::error(PQresultErrorMessage(t->pgResult));
            }
            else if(PQresultStatus(t->pgResult) == PGRES_NONFATAL_ERROR)
            {
                spdlog::error("fail to execute but try again [ID : {:d}]", t->id);
                spdlog::error(PQresultErrorMessage(t->pgResult));
            }
            else if(PQresultStatus(t->pgResult) == PGRES_FATAL_ERROR)
            {
                spdlog::error("fatal error [ID : {:d}]", t->id);
                spdlog::error(PQresultErrorMessage(t->pgResult));
            }
                
            spdlog::trace(query.c_str());
            PQclear(t->pgResult);
        }
    }
    
    spdlog::trace("end dequeueQuery() [ID : {:d}]", t->id);
    return;
}

Pool::Pool()
{
    init();
    return;
}

Pool::~Pool()
{
    spdlog::trace("start ~Pool()");

    unsigned int i = 0;
    for(i=0;i<MAX_CONNECTION;i++)
    {
        delete *(pool + i);
    }

    delete pool;

    spdlog::info("Success to destory DB pool");

    spdlog::trace("end ~Pool()");
    return;
}

void Pool::init()
{
    spdlog::trace("start Pool()");

    unsigned int i = 0;
    
    tParam_t *tmpParam;

    pool = new thread*[MAX_CONNECTION];  /* will be deleted : OK */

    for(i=0;i<MAX_CONNECTION;i++)
    {
        tmpParam = new tParam_t;  /* will be deleted : OK */
        tmpParam->id = i;
        tmpParam->conn = PQconnectdb(CONN_STR);

        if(PQstatus(tmpParam->conn) != CONNECTION_OK)
        {
            spdlog::critical("fail to connect db server");
            spdlog::critical("check the server and configure");

            delete tmpParam;
            continue;
        }
        else
        {
            *(pool + i) = new thread(dequeueQuery, tmpParam);  /* will be deleted : OK */
            (*(pool + i))->detach();
            usleep(100);

            spdlog::info("Success to create DB pool [ID : {:d}]", i);
        }
        
    }

    spdlog::trace("end Pool()");

    return;
}
  1. init()
    • Pool 클래스 초기화
    • static 으로 선언 되어 있기 때문에 필요한 것들은 대부분 new를 통해 동적할당 해줌
  2. enqueueQuery()
    • 생성된 쓰레드가 바라보고 있는 큐에 요청을 넣는 함수
    • 쿼리와 콜백 함수가 들어있는 구조체를 동적으로 할당 후 처리가 완료되면 delete
  3. dequeueQuery()
    • 실제 생성되는 쓰레드 함수
    • 무한으로 루프를 돌며 큐에 적재된 요청을 꺼내 처리

main.cpp

#include "common.h"
#include "pool.h"

void *printResult(void *param)
{
    while(1==1)
    {
        if(param != nullptr)
        {
            spdlog::critical("value : {:s} [address={:p}]", ((*(ResultTable)param))[0][0], (void*)param);
            delete (ResultTable)param;
            break;
        }
        else
        {
            sleep(1);
        }
    }

    return nullptr;
}

int main()
{
    spdlog::set_level(spdlog::level::info);
    spdlog::info("start main()");

    unsigned int i = 0;

    Pool::init();

    sleep(2);

    for(i=0;i<100000000000;i++)
    {
        Pool::enqueueQuery("select * from test2 where 1=1", printResult);
    }

    sleep(3600);
    spdlog::info("end main()");
    return 0;
}
  1. printResult()
    • 콜백함수
    • 큐에 넣은 요청이 완료 되면 실행되는 함수이며 select 의 결과 값을 인자로 받아 사용
  2. main()
    • 프로그램이 시작될 때 실행되는 함수
    • 컨넥션 풀을 초기화하고 10개의 컨넥션 풀을 갖고 천 억번의 요청
%d 블로거가 이것을 좋아합니다: